Как правильно выполнить одиночную блокировку, синхронный прием с Pika?

Я хотел бы использовать Pika/RabbitMQ по образцу, аналогичному стандартному сокету: то есть настроить соединение, затем сделать блокирующие синхронные вызовы для получения одного сообщения каждый раз, когда я готов выполнить дополнительную работу.

Вариант А: basic_get

Метод basic_get метода BlockingConnection предлагает возможность получить сообщение, но он немедленно возвращается, если нет доступных для приема сообщений. Это похоже на вызов сокета recv с отключенной блокировкой. Я мог бы использовать этот подход с тайм-аутом для непрерывного опроса, но это неэффективно.

Вариант Б: basic_consume

Метод basic_consume из BlockingConnection мог бы выполнить эту работу, но у него есть странное требование, чтобы я имел start_consuming() где-то еще, в отдельном потоке. Поскольку мои вызывающие объекты моего метода receive уже ожидают блокировки, ожидая сообщения, это кажется пустой тратой потока.

Можно ли с Pika сделать аналог socket.recv(blocking=True)?


person Ethan T    schedule 10.01.2019    source источник


Ответы (2)


Запустите Pika в отдельном потоке и basic_consume со значением предварительной выборки 1 (если вам действительно нужно одно сообщение за раз). Вставляйте сообщения в какую-то синхронизированную структуру данных, которую ваши абоненты могут заблокировать.

Обязательно правильно подтверждайте свои сообщения из других тем ( пример)


ПРИМЕЧАНИЕ. команда RabbitMQ отслеживает rabbitmq-users список рассылки и лишь иногда отвечает на вопросы в StackOverflow.

person Luke Bakken    schedule 10.01.2019

Используйте метод basic_get канала, как в этом примере:

credentials = pika.PlainCredentials('username', 'password')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', credentials=credentials))
channel = connection.channel()

inmessage = channel.basic_get("your_queue_name", auto_ack=True)

inmessage — это кортеж из 3 элементов, элемент с индексом 2 — это тело вашего сообщения.

person Rustam Shafigullin    schedule 05.05.2020