Twisted и Pika - Как добавить обратный вызов к вызову подтверждения сообщения?

Я использую адаптер протокола twisted от pika. Когда я успешно обработал сообщение, я отправляю подтверждение RabbitMQ, используя этот вызов:

channel.basic_ack(delivery_tag=delivery_tag) 

Вызов basic_ack, похоже, запускается асинхронно, но он не возвращает отложенный вызов, поэтому я не могу добавить обратный вызов или errback. Теперь я столкнулся с небольшой проблемой, потому что я хочу дождаться определенного сообщения из очереди, обработать его и отключить реактор, т.е.

channel.basic_ack(delivery_tag=delivery_tag)
reactor.stop()

Конечно, реактор отключается до того, как сообщение будет отправлено. Я работаю над этим, откладывая выключение

channel.basic_ack(delivery_tag=delivery_tag)
reactor.callLater(5, reactor.stop)

Но это похоже на довольно "хакерский" способ сделать это. Я бы предпочел иметь возможность сделать что-то вроде:

d = channel.basic_ack(...)
d.addBoth(lambda x: reactor.shutdown())

Я пропустил что-то очевидное здесь? Неужели нельзя привязать обратный вызов к концу вызова подтверждения?


person Community    schedule 15.11.2014    source источник
comment
Когда именно вы хотите, чтобы обратный вызов произошел?   -  person Jean-Paul Calderone    schedule 16.11.2014
comment
Когда подтверждение успешно отправлено в RabbitMQ, в основном спасибо, я сделал работу, которую вы просили меня сделать, теперь я закрываюсь   -  person    schedule 16.11.2014
comment
Включает ли протокол RabbitMQ подтверждение для подтверждения? Если нет, как вы узнаете, что подтверждение было успешно отправлено RabbitMQ?   -  person Jean-Paul Calderone    schedule 16.11.2014
comment
Нет, это не так, но если бы я использовал блокирующий вызов, я бы блокировал его до тех пор, пока выполнение не возобновится или не возникнет исключение. Кажется, что скрученный класс-оболочка pika возвращает deferreds для некоторых вызовов, но в случае вызова ack он просто выполняет его асинхронно, но ничего не возвращает. Я, возможно, неправильно понял, как это работает, поэтому мой вопрос   -  person    schedule 16.11.2014
comment
Я подозреваю, что случаи, когда API-интерфейсы pika возвращают Deferred, — это случаи, когда сервер RabbitMQ возвращает ответ. Легко понять, почему в таких случаях имеет смысл возвращать Deferred, поскольку Deferred — это единственный способ получить данные, включенные в ответ сервера, когда API запроса/ответа является асинхронным. Также легко увидеть, когда в этом случае сработает Deferred — он сработает, когда ответ будет получен от сервера. В тех случаях, когда сервер не отправляет ответа, менее очевидно, когда сработает Deferred. Сравнение с блокирующим кодом только заводит нас.   -  person Jean-Paul Calderone    schedule 17.11.2014
comment
Да, это имеет смысл. Я думаю, что мой сценарий является крайним случаем и, вероятно, лучше всего обрабатывается, поскольку я сейчас это делаю. Мне просто было интересно узнать, не пропустил ли я что-нибудь очевидное. Спасибо за ваши комментарии Жан-Поль   -  person    schedule 17.11.2014
comment
Если бы асинхронный API точно воспроизводил функциональность блокирующего API, то Deferred срабатывал бы, когда данные для записи были скопированы из буфера пространства пользователя в буфер ядра. Pika, вероятно, мог бы как-то предложить это (хотя Twisted не делает его легким), но это не идиоматический способ создания сетевых API с Twisted (отсюда сложность), потому что он не может надежно указать, что одноранговый узел получил сообщение (если происходит сбой процесса отправки, например, сообщение все еще может быть потеряно).   -  person Jean-Paul Calderone    schedule 17.11.2014


Ответы (1)


Таким образом, кажется, единственный вариант - ждать и надеяться, что подтверждение было успешно отправлено.

person Community    schedule 17.11.2014