Мы создаем систему обмена сообщениями на основе AMQP на Ruby. Однако у нас есть проблема с обработкой ошибок.
Мы поддерживаем белый список исключений, которые являются безопасными, и сообщение в RabbitMQ может быть оставлено неподтвержденным и повторено другим работником. Однако при неизвестных или непредвиденных ошибках мы предполагаем, что один и тот же сбой будет происходить всегда, независимо от того, сколько раз сообщение пытается отправить работник.
Это означает, что когда возникает неизвестная ошибка, нам нужно зафиксировать ее, записать куда-нибудь (в настоящее время MySQL), а затем отправить вызов ACK
RabbitMQ, чтобы удалить сообщение из очереди.
В настоящее время все создается с использованием гема amqp, который обрабатывается с помощью EventMachine. Это вызывает проблему, потому что вызов метода #ack
не означает, что ACK
был отправлен в RabbitMQ благодаря асинхронному поведению гема.
Ранее я обошел эту проблему, грубо поместив код повышения в EM.next_tick
. Теперь нам нужно сделать многопоточным каждый рабочий процесс Ruby для повышения производительности, а next_tick
не работает.
Вкратце:
Как бы вы поступили с синхронным запуском определенного фрагмента кода сразу после асинхронного вызова ACK в геме amqp? Обратный вызов был бы хорош, но он недоступен, по крайней мере, без какого-то серьезного злого исправления обезьяны.