Отказоустойчивая рассылка сообщений для конкретного получателя с использованием redis и python

Итак, в Redis 5.0 недавно была представлена ​​новая функция под названием Streams. Они кажутся идеальными для распространения сообщений для межпроцессного взаимодействия:

  • они превосходят возможности обмена сообщениями о событиях PUB / SUB с точки зрения надежности: PUB / SUB работает по принципу «запустил и забыл», и нет гарантии, что получатель получит сообщение
  • Списки redis несколько низкоуровневые, но их все же можно использовать. Однако потоки оптимизированы по производительности и в точности для описанного выше варианта использования.

Однако, поскольку эта функция является довольно новой, почти нет руководств по Python (или даже по общему Redis), и я действительно не понимаю, как адаптировать потоковую систему к моему варианту использования.

Я хочу иметь одну программу-издатель, которая отправляет сообщения в поток и содержит информацию о получателе (например, recipient: "user1"). Затем у меня будет несколько процессов получения, которые все должны проверять наличие новых потоковых сообщений и сравнивать, являются ли они целевым получателем. Если да, они должны обработать сообщение и пометить его как обработанное (подтвержденное).

Однако я не совсем понимаю группы потребителей, состояние ожидания и так далее. Может ли кто-нибудь дать мне реальный пример моего маленького псевдокода?

sender.py

db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})

recipient.py (будет много их экземпляров, каждый из которых будет иметь уникальный идентификатор получателя)

recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
    message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
    if message.recipient == recipient_id:
        perform_task(message.task)
        message.acknowledge() # let the stream know it was processed
    else:
        pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```


person fameman    schedule 16.02.2020    source источник


Ответы (1)


Представьте, что с помощью приведенного вами примера и псевдокода:

  • recipient.user1 получает 60 сообщений в минуту
  • а выполнение метода perform_task() занимает 2 секунды.

То, что здесь произойдет, очевидно: задержка между поступлением нового сообщения и его обработкой со временем будет только расти, отклоняясь все дальше и дальше от «обработки в реальном времени».

system throughput = 30 messages/minute

Чтобы обойти это, вы можете создать группу потребителей для user1. Здесь у вас может быть 4 разных процесса python, работающих параллельно, и все 4 будут объединены в одну группу для user1. Теперь, когда приходит сообщение для user1, один из 4 рабочих заберет его и perform_task().

system throughput = 120 message/minute

В вашем примере message.acknowledge() на самом деле не существует, потому что ваш потоковый ридер один (команды XREAD).

Если бы это была группа, подтверждение сообщений становится важным, поэтому redis знает, что один из участников группы действительно обработал это сообщение, поэтому он может «двигаться дальше» (он может забыть тот факт, что это сообщение ожидает подтверждения) . Когда вы используете группы, существует небольшая логика на стороне сервера, чтобы гарантировать, что каждое сообщение будет доставлено одной из рабочих групп потребителей один раз (команды XGROUPREAD). Когда клиент завершает работу, он выдает подтверждение этого сообщения (команды XACK), чтобы серверный «буфер группы потребителей» мог удалить его и продолжить работу.

Представьте, что рабочий умер и так и не подтвердил сообщение. С группой потребителей вы можете следить за этой ситуацией (используя команды XPENDING) и действовать в соответствии с ними, например, повторяя попытку обработки того же сообщения у другого потребителя.

Когда вы не используете группы, серверу redis не нужно «двигаться дальше», «подтверждение» становится на 100% клиентской / бизнес-логикой.

person smassey    schedule 20.02.2020