Google IoT - Правый режим для получения уведомлений (подписка работает)

Я слежу за этим учебное пособие, и у меня уже есть код публикации сообщений в /devices/sm1/events теме, в которой sm1 - это идентификатор моего устройства.

Я хотел бы знать, как подписаться на эту тему, поскольку в учебнике сказано использовать /devices/sm1/config, но я получаю пустые сообщения. Я уже пробовал использовать тот же «путь», который использовался при публикации (/devices/sm1/events), но это тоже не сработало.

Странно, что название, которое я дал этой теме, было sm1, а тема, связанная с моим устройством, находится в консоли GoogleIoT, отображается как projects/myprojectname/topics/sm1. Итак, помимо того, чтобы узнать, как подписаться на упомянутую тему, я также ценю любые объяснения, связанные с правильным способом использования тем pub / sub в GoogleIoT (документация не так понятна).

Это мой subscribe.py:

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
topic = "/devices/sm1/config"

def on_connect(client, userdata, flags, response_code):
    print("Connected with status: {0}".format(response_code))
    client.subscribe(topic, 1)

def on_message(client, userdata, msg):
    print("Topic: {0}  --  Payload: {1}".format(msg.topic, msg.payload))

if __name__ == "__main__":    
    client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                         project_id,
                         cloud_region,
                         registry_id,
                         device_id))

    client.username_pw_set(username='unused',
                           password=jwt_maker.create_jwt(project_id,
                                               private_key,
                                               algorithm="RS256"))

    client.tls_set(root_ca,
                   certfile = public_crt,
                   keyfile = private_key,
                   cert_reqs = ssl.CERT_REQUIRED,
                   tls_version = ssl.PROTOCOL_TLSv1_2,
                   ciphers = None)


    client.on_connect = on_connect
    client.on_message = on_message

    print "Connecting to Google IoT Broker..."
    client.connect(mqtt_url, mqtt_port, keepalive=60)
    client.loop_forever()

Мой вывод:

Подключено со статусом: 0
Тема: / devices / sm1 / config - Payload:
Тема: / devices / sm1 / config - Payload:


person Dalton Cézane    schedule 03.04.2018    source источник


Ответы (3)


Прочитав обсуждение в разделе комментариев в ответе @GabeWeiss, я думаю, что есть небольшая путаница в том, чего вы хотите достичь и как (или для чего) вы пытаетесь использовать Pub / Sub.

Учитывая, что я думаю, что проблема здесь более концептуальная, позвольте мне сначала отослать вас на общую страницу документации о Ключевые концепции Cloud IoT Core, где вы фактически найдете некоторую информацию о взаимосвязи между Cloud IoT Core и Pub / Sub. Таким образом, данные телеметрии устройства публикуются в теме Cloud IoT Core, которая позже через Data Broker публикуется в теме Cloud Pub / Sub, которую вы можете использовать извне для других целей: запуск облачных функций, анализ потока данные с потоком данных и т. д.

Теперь, если вы следуете руководству по быстрому запуску Cloud IoT Core, вы увидите, как в определенный момент вы создаете реестр устройств, привязанный к Pub / Подраздел, в котором публикуются события телеметрии устройства. Если вместо того, чтобы писать в тему Pub / Sub по умолчанию, вы хотите писать в несколько тем, вы можете следовать пояснениям в разделе этот другой раздел документации.

Наконец, переходя к вопросу о подписке на темы (темы Cloud IoT Core, а не темы Pub / Sub, поскольку только первые актуальны для устройств), вы можете подписаться на темы MQTT с помощью следующая команда, где (в качестве примера ), устройство подписывается на тему конфигурации, в которой публикуются обновления конфигурации:

# This is the topic that the device will receive configuration updates on.
mqtt_config_topic = '/devices/{}/config'.format(device_id)

# Subscribe to the config topic.
client.subscribe(mqtt_config_topic, qos=1)

Затем с помощью on_message() функция вы можете обрабатывать сообщения, опубликованные по темам, на которые вы подписаны. Обратите внимание, что полезная нагрузка должна быть проанализирована как строка (str(message.payload)).

def on_message(unused_client, unused_userdata, message):
    payload = str(message.payload)
    print('Received message \'{}\' on topic \'{}\' with Qos {}'.format(
            payload, message.topic, str(message.qos)))

Затем в своем вопросе вы указали, что сначала подписались на /devices/{device-id}/config, но это может быть не то, что вам нужно, поскольку именно в этой теме публикуются обновления конфигурации (т.е. не там, где публикуются события телеметрии). Затем, насколько я понимаю, вам следует подписаться на /devices/{device-id}/events, который является актуальной темой MQTT, в которой публикуются показатели телеметрии. Если это не сработает, возможно, возникла другая проблема, поэтому убедитесь, что вы правильно анализируете переменную message, и, возможно, попробуйте используйте Pub / Sub с темой по умолчанию, созданной с помощью реестра, чтобы проверить, правильно ли публикуются показатели телеметрии.

person dsesto    schedule 11.04.2018
comment
Привет, дсесто. В вопросе ясно, что я хочу: подписаться на темы телеметрии, чтобы получать уведомления с устройств, без использования библиотеки google pub / sub. Всю документацию, которую вы упомянули, я уже прочитал раньше, и, как я сказал в своем вопросе и в комментариях, документация неясна. Кроме того, все предложения, которые вы сказали, я пробовал раньше. Так что, к сожалению, ваш ответ ни в чем не помог. Спасибо, в любом случае. - person Dalton Cézane; 11.04.2018

Изменить: уточняется на основе комментария ниже ...

Здесь задействованы два компонента GCP. Есть тема MQTT (которая является темой / events), которая используется устройством для взаимодействия с IoT Core. Затем есть projects/myprojectname/topics/sm1, которого нет в IoT Core, он находится в Pub / Sub. Когда вы отправляете сообщения в тему MQTT / events, IoT Core передает полезные данные с вашего устройства, которые были отправлены в тему MQTT / events, через тему Pub / Sub, которая была создана и прикреплена к реестру IoT Core, где было зарегистрировано ваше устройство. .

Чтобы увидеть эти сообщения, вы должны создать подписку в Pub / Sub по теме projects/myprojectname/topics/sm1. Если зайти в консоль и паб / под-> топики. Щелкните три точки рядом с темой и выберите «Новая подписка». После того, как у вас есть подписка, вы можете отправить некоторые данные со своего устройства, а затем в командной строке вы можете запустить (при условии, что у вас установлены инструменты gcloud):

gcloud beta pubsub subscriptions pull --max-messages=3 <subscription_id>

Чтобы делать что-либо с сообщениями, вы можете подписаться на тему Pub / Sub с помощью сценария (проверьте API-интерфейсы Pub / Sub), чтобы запускать сообщения, добавляемые в тему.

Исходное сообщение:

Вы отправляете конфигурационное сообщение на устройство? Путаница может заключаться в том, что темы MQTT однонаправлены.

Итак: 1) тема / events предназначена для устройства-> IoT Core. 2) тема / config предназначена для IoT Core Admin SDK-> устройство

Где-то в другом скрипте или из интерфейса пользовательского интерфейса IoT Core вам нужно отправить сообщение конфигурации, чтобы правильно увидеть срабатывание on_message.

В пользовательском интерфейсе IoT Core (на console.cloud.google.com) вы можете перейти к отдельному зарегистрированному устройству и в верхней части экрана нажать «Обновить конфигурацию». Появится всплывающее окно, которое позволит вам отправить текстовое сообщение или сообщение в кодировке base64 на это устройство, и оно появится в теме / config.

person Gabe Weiss    schedule 04.04.2018
comment
Пока не ясно. В документации об этом ничего не говорится. Как объясняется в вопросе, у меня есть следующая тема, созданная в IoT Core projects/myprojectname/topics/sm1. Но мой клиент должен публиковать сообщения в теме с именем events, и я до сих пор не знаю, в какой теме я должен подписаться, чтобы получать заинтересованные сообщения по желаемой теме _3 _... - person Dalton Cézane; 04.04.2018
comment
А, ладно думаю понял, отредактирую свой ответ ... сек - person Gabe Weiss; 04.04.2018
comment
Я изменил свою тему на projects/myprojectname/topics/sm1 вместо /devices/sm1/config, но я все еще не могу увидеть опубликованные значения: Подключение к брокеру Google IoT ... Подключено со статусом: 0. Должно ли это быть связано с какими-то разрешениями? Что-то еще отсутствует? - person Dalton Cézane; 04.04.2018
comment
Вы все еще пытаетесь подключиться к IoT Core. Pub / Sub - это не MQTT. Есть отдельный API для подписки на темы Pub / Sub. Начните здесь: cloud.google.com/pubsub/docs - person Gabe Weiss; 04.04.2018
comment
Итак, я думаю, что это должно быть описано в руководствах, связанных с IoT Core ... даже более того, в них описано, как создавать темы ... если мы узнаем, как создавать, мы также должны узнать, как подписаться, и эта информация не там. Я буду читать. Спасибо. - person Dalton Cézane; 04.04.2018
comment
Ага, у меня тоже есть заметка для разработчиков документации по этому поводу, так что у нас, по крайней мере, есть навигационная цепочка к документам Pub / Sub, которым нужно следовать. - person Gabe Weiss; 05.04.2018
comment
Хотя в руководстве по быстрому запуску в разделе «Запуск образца Node.js для подключения виртуального устройства и просмотра телеметрии», шаг 5 описывает создание подписки из командной строки, а шаг 7 - это команда для чтения сообщений подписки Pub / Sub. . - person Gabe Weiss; 05.04.2018
comment
Пока не ясно. Я просто хочу создать собственное приложение, не зависящее от языка. Не хочу использовать cli, так как тема уже создана в IoT Core; Я не хочу использовать образцы Google (например, этот клиент) ... Я просто хочу прочитать любую документацию, в которой рассказывается, какие URL-адреса используются для подписки и каков формат сообщений. Я читаю документацию для подписчиков, и в ней также не упоминается, что мне нужно ... - person Dalton Cézane; 05.04.2018
comment
Вы имеете в виду вот что: cloud.google.com/pubsub/docs/push? Существует два типа подписок, которые описывают метод push, поэтому при настройке подписки вы настраиваете конечную точку для отправки сообщений. Там процесс описан. Там также есть пример того, как будет выглядеть ответ. - person Gabe Weiss; 06.04.2018
comment
У меня есть подписка с этим адресом /projects/myproject/subscriptions/sm1, но мне не удалось установить соединение. Вот мой короткий код. Я не знаю, подходит ли протокол MQTT для такого рода операций, хотя должен быть, как и другие платформы. - person Dalton Cézane; 06.04.2018
comment
Я не думаю, что это сработает, но вы можете попробовать, вам не хватает: client.connect('mqtt.googleapis.com', 8883) client.loop_start() Однако, чтобы инициировать соединение MQTT. Фактически, я на 99% уверен, что это не так, потому что конечная точка MQTT связана только с IoT Core, а не с Pub / Sub. Вам необходимо использовать Python Pub / Sub API, чтобы читать подписку. - person Gabe Weiss; 06.04.2018
comment
Теперь вы понимаете мою ситуацию? По этому поводу нет такой информации. Я уже пытался увидеть код библиотеки pub / sub, но она не использует библиотеку для связи через mqtt. Итак, я не знаю, поддерживается ли gcp этот протокол для pub / sub, как и на других платформах. В моем понимании все еще есть пробелы из-за предоставленной документации. - person Dalton Cézane; 06.04.2018
comment
Кажется, что ядро ​​Интернета вещей все еще очень ограничено, не так ли? Я так думаю, потому что, по-видимому, он просто работает с темами событий, состояний и конфигурации ... Кажется, они служат только для цели: настройка / администрирование устройств. Если я хочу контролировать некоторые датчики с определенными темами, я должен использовать google pub / sub. В этом случае IoT Core может остаться неиспользованным. Я прав? Еще раз спасибо. - person Dalton Cézane; 19.04.2018
comment
Извини, Далтон, я путешествовал. :) С IoT Core у вас может быть до 10 тем Pub / Sub в каждом реестре. Таким образом, у вас может быть: /events/temp/ /events/pressure/ /events/humidity/ и т. Д. Одна из причин, по которой вы не захотите сразу переходить в Pub / Sub (среди прочего), - это часть управления устройством. Хотя это правда, вы можете сразу перейти к Pub / Sub для конфигурации, например, если ваше устройство не подключено, когда конфигурация проходит, ваше устройство не получит обновление, если вы не напишете retry и т. Д. Кроме того, безопасность . Вам придется самостоятельно управлять безопасным обменом данными без IoT Core. - person Gabe Weiss; 01.05.2018

Мне пришлось сдаться библиотеке Google Pub / Sub, чтобы получать уведомления, связанные с моей указанной темой.

Мой код публикации (только важные части):

mqtt_url = "mqtt.googleapis.com"
mqtt_port = 8883
mqtt_topic = "/devices/sm1/events"

client = mqtt.Client("projects/{}/locations/{}/registries/{}/devices/{}".format(
                     project_id,
                     cloud_region,
                     registry_id,
                     device_id), protocol=mqtt.MQTTv311)

client.username_pw_set(username='unused',
                       password=jwt_maker.create_jwt(project_id,
                                           private_key,
                                           algorithm="RS256"))

client.tls_set(root_ca, 
               certfile = public_crt, 
               keyfile = private_key, 
               cert_reqs = ssl.CERT_REQUIRED, 
               tls_version = ssl.PROTOCOL_TLSv1_2, 
               ciphers = None)

client.connect(mqtt_url, mqtt_port, keepalive=60)
res = client.publish(mqtt_topic, some_data, qos=1)

На портале Google Cloud Platform мне пришлось создать подписку в разделе Pub / Sub, назначив ее моей созданной теме, которая уже была моей темой по умолчанию (возможно, связанной с темой / events). Созданная подписка имеет следующий формат:

projects/project_name/subscriptions/subscription_name

Мой код подписки с использованием библиотеки Google Pub / Sub, поскольку невозможно использовать протокол MQTT:

from google.cloud import pubsub

def callback(message):
    print(message.data)
    message.ack()

project_id = "project_name"
subscription_name = "sm1"

subscriber = pubsub.SubscriberClient()
subscription_name = 'projects/{}/subscriptions/{}'.format(project_id, subscription_name)

subscription = subscriber.subscribe(subscription_name)
future = subscription.open(callback)

try:
    future.result()
except Exception as ex:
    subscription.close()
    raise

Надеюсь, это поможет кому угодно. Более подробную информацию можно найти здесь (github).

person Dalton Cézane    schedule 18.05.2018