Интеграция производителя данных в качестве рабочего в Django Channels 2.x

Я разрабатываю приложение, в котором данные в реальном времени, которые будут отправляться клиентам, будут поступать из внешнего API. Простую его версию можно представить как трекер валютной валюты. Пользователь укажет, какие валюты он хочет отслеживать (доллары США, евро, фунты стерлингов и т. Д.), И будет получать обновления в реальном времени. Данные о валюте будут поступать из внешнего API посредством длительного опроса. У меня вопрос, как интегрировать этого производителя данных в каналы?

Во всех примерах каналов я обнаружил, что работа рабочего запускается событием, но в моем случае он будет начинаться с самого начала, работать непрерывно и вместо получения событий он просто отправит новые значения на уровень канала, чтобы подписчики могли быть уведомлены. Поэтому я не уверен, что потребительская модель правильная. Подытоживая мои вопросы:

  • Должен ли я использовать для этой задачи потребителя и как его настроить? Учитывая, что доступ к API будет осуществляться при длительном опросе асинхронного или синхронизирующего потребителя? Начать опрос внешнего API в его методе подключения или просто отправить одноразовое событие для этого? Откуда и когда отправлять это событие «начало работы»?

  • Я также хочу использовать redis для хранения значений для предоставления пользователю начального значения валют. Они начнут прислушиваться к обновлениям при подключении, но, возможно, обновление появится через много секунд. Могу ли я получить доступ к экземпляру соединения Redis, используемому канальным уровнем, или мне нужно для этой цели открыть другое соединение с моим Redis?

Другой вариант для производителя данных - хранить его полностью вне каналов Django, как описано здесь и просто отправляйте данные на уровень канала, но я не уверен, что во время развертывания это может быть проблематичным с Дафни. Я имею в виду, как я могу убедиться, что он работает и правильно распределяет ресурсы с каналами?

Спасибо.


person pembeci    schedule 28.06.2018    source источник


Ответы (2)


Рабочие подходят для вашего варианта использования. Они предназначены для длительного использования, и для каждого запроса не существует нового экземпляра. Если вы хотите, чтобы ваши потребители были асинхронными, вы должны убедиться, что ничто из того, что вы делаете, не блокирует. Все запросы к базе данных должны быть заключены в database_sync_to_async, даже если вызов базы данных происходит на 5 уровнях вниз по стеку вызовов. Вы можете использовать API кеша Django для подключения к Redis, но вам лучше работать за его пределами, чтобы все было асинхронным. Используйте каналы библиотеки redis напрямую, поскольку в ней есть асинхронные методы для работы с redis в качестве кеша.

person kagronick    schedule 29.06.2018
comment
Спасибо за ответ. У меня что-то работает. Вы знаете, как я могу получить доступ к собственным асинхронным методам redis канала? - person pembeci; 04.07.2018
comment
Если вы хотите отправлять сообщения, используйте методы в пакете каналов redis. Если вы хотите использовать его как базу данных, подключитесь к aioredis вот так aioredis.create_redis(**channels.layers.get_channel_layer().hosts[0]) - person kagronick; 05.07.2018
comment
Понятно. Большое спасибо. - person pembeci; 06.07.2018
comment
привет @pembeci, у меня точно такие же требования, как и у меня, и те же вопросы. Мне было интересно, можете ли вы рассказать, как вы это настроили - person Nasir; 21.11.2018
comment
@Nasir, в ответ я попытался объяснить свою настройку. Задавайте любые вопросы, если это сбивает с толку. - person pembeci; 23.11.2018

(чтобы ответить на комментарий Насира и для последующих посетителей, вот моя полная настройка)

Каналы и их сотрудники действительно были хорошим выбором для моего проекта, и у меня есть кое-что, что хорошо работает. Он еще не запущен в производство, но работает нормально, а код хорошо структурирован, с ним легко работать и т. Д.

Прежде всего нам нужно настроить воркера и заставить его работать. Предположим, что наш рабочий класс - ExternalData, мы собираемся настроить определенный канал для рабочего:

# routing.py
application = ProtocolTypeRouter({
    # ...
    'channel': ChannelNameRouter({
        "external-data": ExternalData,
    })
})

# asgi.py  
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })

# external_data.py   worker's code

# used as a singleton object
class DataStore(object):

    @classmethod
    async def create(cls, owner):
        self = DataStore() 
        self.currencies = {}
        self.owner = owner
        # ...
        return self

class ExternalData(AsyncConsumer):

    started = False

    # triggered from asgi.py via daphne start
    async def external_data_start(self, event):

        if ExternalData.started:
            if settings.DEBUG:
                raise RuntimeError("ExternalData already working.")
            else:
                logger.warning("ExternalData already working.")
                return
        else:
            # do your initialization work here and let the data producer start listening and saving external data 
            ExternalData.started = True
            self.store = await DataStore.create(owner=self)

DataStore в приведенном выше коде, конечно, не требуется, но если вы собираетесь делать что-то сложное, может быть лучше использовать ExternalData только для каналов, связанных с вещами, а все остальное делать в другом классе. При такой настройке вам нужно сначала запустить воркер:

python manage.py runworker external-data 

а затем запустите daphne (т.е.в другом терминале, чтобы увидеть вывод обоих):

daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application

В производственной среде, когда вам нужно написать службу или аналогичную, daphne следует запустить немного позже (например, спать на 2-3 секунды), чтобы убедиться, что рабочий файл обрабатывается python и работает. Вы также можете попробовать код asgi.py несколько раз (т.е. в цикле с некоторым сном), пока рабочий не установит какой-либо флаг среды.

Теперь наш поставщик данных работает, но как насчет клиентов? Нам нужен потребитель, который в основном будет действовать как посредник между нашим поставщиком данных и клиентами. В моем проекте требования к передаче данных покрывали большинство случаев:

  • A: когда клиент подключается, отправьте некоторые исходные данные
  • B: клиент может посетить страницу и ему нужно получить некоторые дополнительные данные, относящиеся к странице
  • C: клиент находится на странице, куда вам нужно отправить данные в реальном времени и обновить страницу
  • D: поступили какие-то новые данные и нужно сообщить клиенту

У нас одностраничное приложение, поэтому нам все это было нужно. Вот фрагмент, в котором рассказывается, как я поступал во всех этих случаях:

# consumer.py

class FeedsConsumer(AsyncJsonWebsocketConsumer):
    groups = ["broadcast"]   # for requirement D

    # triggered from client
    async def connect(self):
        await self.accept()
        self.listening = set()  # for requirement C
        logger.info(f"New client connected: {self.channel_name}")
        # for requirement A
        await self.channel_layer.send("external-data",
           { "type": "external.new_client", 'client_channel': self.channel_name })

    # triggered from client
    async def receive_json(self, data):        
            # for requirement B
            if data["type"] == "get_currency":
                payload["type"] = "external.send_currency"
                payload["client_channel"] = self.channel_name
                payload["currency"] = data["currency"]
                self.listen(data["currency"])  # for requirement C
                await self.channel_layer.send("external-data", payload)

    # for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
    async def listen(self, item_id):
            if item_id not in self.listening:
                await self.channel_layer.group_add(item_id, self.channel_name )
                self.listening.add(item_id)    

    # below are triggered from the worker. A and B as responses. C and D as server generated messages 

    # for requirement A
    async def init_data(self, payload):
        await self.send_json(payload)

    # for requirement B
    async def send_currency(self, payload):
        await self.send_json(payload) 

    # for requirement C
    async def new_value(self, payload):
        await self.send_json(payload)  

    # for requirement D
    async def new_currency(self, payload):
        await self.send_json(payload) 

# external_data.py   worker's code

class ExternalData(AsyncConsumer):

    # for requirement A. triggered from consumer.
    async def external_new_client(self, payload):
        data_to_send = list(self.store.currencies.keys())
        # prepare your data above and then send it like below
        await self.channel_layer.send(payload["client_channel"],  # new client
          { 'type': 'init_data',
            'data': data_to_send,
          })

    # for requirement B. triggered from consumer.
    async def external_send_currency(self, payload):
        data_to_send = self.store.currencies[payload["currency"]]
        # prepare your data above and then send it like below
        await self.channel_layer.send(payload["client_channel"],  # only the client who requested data
          { 'type': 'send_currency',
            'data': data_to_send,
          })


    async def new_data_arrived(self, currency, value):
         if currency not in self.store.currencies:
             self.store.currencies[currency] = value
             # requirement D. suppose this is new data so we need to notify all connected users of its availability
             await self.channel_layer.group_send("broadcast",  # all clients are in this group
               { 'type': 'new_currency',
                 'data': currency,
               })
         else:
             # requirement C, notify listeners.
             self.store.currencies[currency] = value
             await self.channel_layer.group_send(currency,  # all clients listening to this currency
               { 'type': 'new_value',
                 'currency': currency,
                 'value': value,
               })

Надеюсь, я не испортил код, и он не слишком сложен (мне было лень вставлять / редактировать отдельный код для каждого требования). Пожалуйста, задавайте любые вопросы в комментариях.

person pembeci    schedule 22.11.2018
comment
Привет, я новичок в каналах django. У меня есть задача сельдерея, работающая в фоновом режиме, которая должна отправлять данные в пользовательский интерфейс после завершения обработки. Я не мог найти способ отправить данные. Не могли бы вы посоветовать мне решение. Спасибо! - person laplace; 07.07.2020
comment
Как вы сделали доступными данные в каналах? Второй вопрос: будете ли вы отправлять его всем подключенным пользователям или некоторым конкретным пользователям (т.е. запускается ли задача сельдерея из пользовательского интерфейса пользователем?)? - person pembeci; 09.07.2020