AWS IoT Python SDK и asyncio

Мне нужно использовать сервис AWS IoT MQTT. Я немного экспериментирую с https://github.com/aws/aws-iot-device-sdk-python прямо сейчас.

Мое приложение будет использовать веб-сокеты для связи с другой службой, а затем публиковать / подписываться на темы MQTT для пересылки / получения сообщений.

Вероятно, что эта библиотека будет блокировать выполнение кода? Я все еще пытаюсь разобраться в asyncio и не уверен, на что мне следует обратить внимание. Как узнать, вызовет ли это проблемы?

Я считаю, что мне нужно будет использовать только AWSIoTMQTTClient из указанной выше библиотеки.

Это отрывок из рабочего кода, который у меня есть:

class AWSIoTClient:

    def __init__():
        ...
        self.client = AWSIoTMQTTClient(...)

    def subscribe(self, callback):
        self.client.subscribe(f'{self.TOPIC}/subscribe/', 0, callback)

    def publish(self, message):
        self.client.publish(self.TOPIC, message, 0)


class MyWSProtocol(WebSocketClientProtocol):

    def set_aws_client(self, client: AWSIoTClient):
        client.subscribe(self.customCallback)
        self.client = client

    def customCallback(self, client, userdata, message):
        # This will be called when we send message from AWS
        if message.payload:
            message = json.loads(message.payload.decode('utf-8').replace("'", '"'))
            message['id'] = self.next_id()
            self.sendMessage(json.dumps(message).encode('utf-8'))

    def onMessage(self, payload, isBinary):
        message = json.loads(payload)

        # This will forward message to AWS
        self.client.publish(str(payload))

person Giannis    schedule 21.11.2017    source источник


Ответы (1)


Вероятно, что эта библиотека будет блокировать выполнение кода?

Как узнать, вызовет ли это проблемы?

Вы не должны допускать наличия длительного блокирующего (синхронного) кода внутри любой из ваших сопрограмм. Это приведет к блокировке вашего глобального цикла событий и дальнейшей блокировке всех ваших сопрограмм повсюду.

async def main():
    await asyncio.sleep(3)  # async sleeping, it's ok

    time.sleep(3)           # synchronous sleeping, this freezes event loop 
                            # and all coroutines for 3 seconds, 
                            # you should avoid it!
    
    await asyncio.sleep(3)  # async sleeping, it's ok

Если вам нужно запустить код блокировки внутри сопрограммы, вы должны сделать это в исполнителе (прочтите об этом здесь).

Вы должны помнить об этом при написании сопрограмм, но обычно asyncio предупредит вас об этой ошибке, если вы включите режим отладки:

import asyncio
import time


async def main():
    await asyncio.sleep(3)
    time.sleep(3)
    await asyncio.sleep(3)


loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
try:
    loop.run_until_complete(main())
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

Вы увидите предупреждение:

Executing <Handle <TaskWakeupMethWrapper object at 0x000002063C2521F8>(<Future finis...events.py:275>) created at C:\Users\gmn\AppData\Local\Programs\Python\Python36\Lib\asyncio\futures.py:348> took 3.000 seconds
person Mikhail Gerasimov    schedule 21.11.2017
comment
Добавлю оператор отладки, если что-то появится. Спасибо - person Giannis; 21.11.2017
comment
Хм, ошибок от отладки нет. Будем продолжать поиски, я думаю, мне нужно понять, как SDK работает из исходного кода. - person Giannis; 21.11.2017