RabbitMQ/Pika — гарантия получения сообщений в том порядке, в котором они были созданы?

В качестве простого примера я добавляю 5 элементов в новую очередь RabbitMQ (v 2.6.1):

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
# add 5 messages to the queue, the numbers 1-5
for x in range(5):
    message = x+1
    channel.basic_publish(exchange='',routing_key='dw.neil', body=str(message))
    print " [x] Sent '%s'" % message
connection.close()

Я очищаю свою очередь, а затем запускаю приведенный выше код, чтобы добавить 5 элементов:

nkodner@hadoop4 sports_load_v2$ python send_5.py 
 [x] Sent '1'
 [x] Sent '2'
 [x] Sent '3'
 [x] Sent '4'
 [x] Sent '5'

Теперь я пытаюсь смоделировать неудачную обработку. Учитывая следующий код для потребления из очереди. Обратите внимание, что у меня закомментирован вызов basic_ack:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='not.my.real.server.net'))
channel = connection.channel()
channel.queue_declare(queue='dw.neil',durable=True)
method_frame, header_frame, body=channel.basic_get(queue='dw.neil')
print method_frame, header_frame
print "body: %s" % body
#channel.basic_ack(delivery_tag=method_frame.delivery_tag)
connection.close()

Я запускаю принимающий код, чтобы получить элемент из очереди. Как я и ожидал, я получаю пункт №1:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

Поскольку вызов channel.basic_ack() закомментирован, я ожидаю, что неподтвержденное сообщение будет помещено в очередь, чтобы его получил следующий потребитель. Я надеюсь, что сообщение № 1 будет первым сообщением (снова) из очереди, а для свойства Redelivered установлено значение True. Вместо этого получено сообщение №2:

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 2

И все остальные сообщения в очереди будут получены до того, как #1 вернется с флагом Redelivered, установленным в True:

...

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=False', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 5

nkodner@hadoop4 sports_load_v2$ python r5.py 
<Basic.GetOk(['message_count=9', 'redelivered=True', 'routing_key=dw.neil', 'delivery_tag=1', 'exchange='])>
<BasicProperties([])>
body: 1

Есть ли какие-либо свойства или параметры, которые я мог бы настроить, чтобы я продолжал получать доставку № 1 до тех пор, пока она не будет подтверждена?

Мой вариант использования — загрузка хранилища данных последовательно сгенерированными файлами. Мы используем обработку на основе сообщений, чтобы сообщить моей программе, что некоторые новые файлы готовы и должны быть загружены в ХД. Мы должны обрабатывать файлы в порядке их создания.


person Neil Kodner    schedule 14.05.2012    source источник
comment
редактировать: эта проблема решена с выпуском rabbitmq 2.7. Начиная с версии 2.7.0, неподтвержденные/отклоненные элементы возвращаются в начало очереди.   -  person Neil Kodner    schedule 22.06.2012


Ответы (2)


Это было исправлено в RabbitMQ 2.7.0 — мы использовали 2.6.1.

Из примечаний к выпуску:

Новые функции в этом выпуске включают в себя:

  • порядок сохранения сообщений, повторно поставленных в очередь для потребителя
person Neil Kodner    schedule 22.06.2012

Попробуйте использовать channel.basic_reject — это должно отправить неподтвержденное сообщение обратно в RabbitMQ, который будет рассматривать сообщение как новое сообщение. Кроме того, если у вас застряло сообщение с ошибкой, вы можете использовать channel.basic_recover, чтобы сообщить RabbitMQ о повторной доставке всех неподтвержденных сообщений.

http://www.rabbitmq.com/extensions.html#negative-acknowledgements предоставляет отличительную информацию о Basic.Reject и Basic.Nack.

Семантика упорядочения сообщений объясняется на странице http://www.rabbitmq.com/semantics.html.

person Gavin M. Roy    schedule 14.05.2012