как зарегистрировать исходный код springxd mqtt в брокере mqtt

Я создал потоки с помощью spring-Xd, такие как-

stream create mqtttestfile --definition "mqtt --url='tcp://localhost:1883' --topics='helloTopic' | file" --deploy

Создан и развернут новый поток mqtttestfile. Я также проверил на localhost:9393/admin-ui, поток успешно создан и развернут.

мой брокер MQTT работает на локальном хосте: 1883. но когда я проверил каталог файлов /tmp/xd/output, файл mqtttestfile.out отсутствует.

Мне нужны разъяснения по следующим пунктам моего предположения: -

  1. Я думаю, что клиент MQTT уже настроен в исходном модуле mqtt spring-xd. поэтому, когда мы создаем потоки, он автоматически подписывается на определенную тему на брокере.

  2. Я также попытался запустить два отдельных скрипта Python, один для подписки, а другой для издателя на отдельном терминале, и он работает нормально. так что никаких проблем с брокером mqtt.

это журнал из консоли spring-xd, который я получил:

istener - Планирование развертывания в новый контейнер(ы) через 15000 мс

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — событие кэша пути: path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.sink .file.1, тип=CHILD_ADDED

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — Развертывание модуля «файл» для потока «mqtttestfile» 2017-03-04T12:07:52+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — модуль развертывания [ModuleDescriptor@7e658391 moduleName = 'file', moduleLabel = 'file', group = 'mqtttestfile', sourceChannelName = [null], sinChannelName = [null], index = 1, type = сток, параметры = карта[[пусто]], дети = список[[пусто]]]

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — событие кэша пути: path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.source .mqtt.1, тип=CHILD_ADDED

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — развертывание модуля mqtt для потока mqtttestfile 2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener — модуль развертывания [ModuleDescriptor@5a7d8e37 moduleName = 'mqtt', moduleLabel = 'mqtt', group = 'mqtttestfile', sourceChannelName = [null], sinChannelName = [null], index = 0, type = источник, параметры = карта ['темы' -> 'helloTopic', 'url' -> 'tcp://localhost: 1883'], дети = список [[пусто]]]

2017-03-04T12:07:56+0530 1.3.0.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler — статус развертывания для потока mqtttestfile: DeploymentStatus{state=deployed}

с spring-xd 1.3.1 проблема остается нерешенной, это сообщение об ошибке, которое я видел в журнале

2017-03-05T01:15:06+0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor — Лидерство отменено из-за прерывания потока

2017-03-05T01:15:06+0530 1.3.1.RELEASE ERROR MQTT Rec: xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter — Потеряно соединение: Соединение потеряно; повторная попытка...

Благодарю.


person andy    schedule 04.03.2017    source источник


Ответы (1)


Я только что проверил ваш стрим, и он работал нормально для меня...

$ cat /tmp/xd/output/mqtttestfile.out 
foo
bar

(после того, как я добавил сообщения foo и bar в очередь).

Когда ведение журнала DEBUG включено для org.springframework.integration (в файле контейнера logback.grooy), я вижу...

2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 endpoint.EventDrivenConsumer - started outbound.mqtttestfile.0
2017-03-04T08:02:59-0500 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - Connected and subscribed to [helloTopic]
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - started mqttInbound
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed}

Странно, что вы не видите сообщения ...started... (INFO).

Вы можете попробовать с версией 1.3.1?

person Gary Russell    schedule 04.03.2017
comment
в настоящее время я использую spring-xd 1.3.0, я попробую с 1.3.1 и позволю вам обновить. Благодарю. - person andy; 04.03.2017
comment
Я все еще получаю ту же ошибку. Я делаю что-то не так, пожалуйста, проверьте следующий журнал ошибок msg- 2017-03-05T01:15:06+0530 1.3.1.RELEASE ERROR MQTT Rec: xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - Потеряно соединение: соединение потеряно ; retrying... 2017-03-05T01:15:06+0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor — Лидерство отменено из-за прерывания потока - person andy; 04.03.2017
comment
Мне трудно сказать; как я уже сказал, ваше определение потока отлично работает для меня; Я только что проверил это и на 1.3.0 с тем же результатом; нет ошибок; данные отображаются в файле. Похоже, у вас есть некоторые ошибки сети/подключения (также Zookeeper), но это странно, если вы подключаетесь к локальному хосту. Я предлагаю вам включить ведение журнала DEBUG (для всего), чтобы посмотреть, сможете ли вы понять, что происходит. - person Gary Russell; 04.03.2017
comment
@ Гэри, я запускаю spring-xd на одном узле. поэтому всякий раз, когда я создаю поток по той же теме, отображается сообщение о потере соединения с ошибкой. - person andy; 05.03.2017
comment
Я запускаю spring-xd в одном узле. поэтому всякий раз, когда я создаю поток по той же теме, отображается сообщение о потере соединения с ошибкой. шаги, которые я выполнил: 1. установил брокера mqtt в свою систему и опубликовал сообщение по определенной теме. 2. запустил spring-xd в одном узле и создал поток. Мое предположение, что он автоматически подпишется на определенную тему, а затем сообщение будет сохранено в приемнике. Такой подход правильный или неправильный? - person andy; 05.03.2017
comment
Как я уже сказал, я не могу этого объяснить. Я делаю то же самое, и это работает. Я использую Rabbit с плагином MQTT в качестве брокера. ИТ подключается при развертывании. - person Gary Russell; 05.03.2017
comment
Спасибо Гэри. Попробую вашу рекомендацию. - person andy; 06.03.2017
comment
Спасибо, Гарри, за ценный вклад... Все работает отлично... - person andy; 08.03.2017