Потоковая передача Spark с использованием MQTTutils для подписки на тему из ActiveMQ с аутентификацией

Кажется, что MQTTUtils предоставляет только три метода: def createStream(jssc: JavaStreamingContext, brokerUrl: String, тема: String, storageLevel: StorageLevel): JavaDStream[String]

Создайте входной поток, который получает сообщения, отправляемые издателем MQTT. def createStream (jssc: JavaStreamingContext, brokerUrl: String, тема: String): JavaDStream [String]

Создайте входной поток, который получает сообщения, отправляемые издателем MQTT. def createStream(ssc: StreamingContext, brokerUrl: String, тема: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[String]

Создайте входной поток, который получает сообщения, отправляемые издателем MQTT.

Но как я могу указать имя пользователя и пароль, если брокер включил аутентификацию?


person Fang Liu    schedule 05.01.2015    source источник


Ответы (3)


Вы можете попробовать включить имя пользователя и пароль в URL-адрес:

mqtt://имя пользователя:пароль@хост:порт

person hardillb    schedule 05.01.2015
comment
Пробовал схемы mqtt и tcp по URL-адресу с учетными данными. Схема mqtt дает java.lang.IllegalArgumentException, а схема tcp дает java.net.UnknownHostException - person ggalmazor; 13.02.2015

Найдите это Пример подсчета слов MQTT Scala.

В частности, для вашего случая запустите издатель как

bin/run-example org.apache.spark.examples.streaming.MQTTPublisher mqtt://username:password@host:port foo

И подписчик как

bin/run-example org.apache.spark.examples.streaming.MQTTWordCount mqtt://username:password@host:port foo

Прежде чем сделать это, убедитесь, что у вас запущен брокер ActiveMQ.

пример кода

import org.apache.activemq.broker.{TransportConnector, BrokerService}
.
.
.
.
def startActiveMQMQTTBroker() {
    broker = new BrokerService()
    broker.setDataDirectoryFile(Utils.createTempDir())
    connector = new TransportConnector()
    connector.setName("mqtt")
    connector.setUri(new URI("mqtt:" + brokerUri))
    broker.addConnector(connector)
    broker.start()
}

файл pom

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-core</artifactId>
  <version>5.7.0</version>
</dependency>
person prabeesh    schedule 30.07.2015

Вы можете попробовать использовать настраиваемую библиотеку spark-streaming-mqtt-connector, доступную здесь — https://github.com/sathipal/spark-streaming-mqtt-with-security_2.10-1.3.0.

Эта библиотека добавляет следующее поверх исходной библиотеки:

  • Добавлена ​​безопасность TLS v1.2, чтобы связь всегда была защищена.
  • Сохраненная тема вместе с полезной нагрузкой в ​​RDD.

Итак, используйте следующий метод для создания потока:

val lines = MQTTUtils.createStream(ssc, // Spark Streaming Context
            "ssl://URL",                // Broker URL
            "<topic>",                 // MQTT topic
            "MQTT client-ID",          // Unique ID of the application
            "Username", 
            "passowrd")

Существуют перегруженные конструкторы, которые также позволяют передать уровень хранилища RDD. Надеюсь это поможет.

person Sathish    schedule 18.02.2016