Использование RabbitMQ в качестве распределенного брокера — как сериализовать задания для каждой очереди

Каждое задание в моей системе принадлежит определенному идентификатору пользователя и может быть помещено в rabbitmq из нескольких источников. Мои требования:

  • В любой момент времени на одного пользователя должно быть запущено не более 1 задания.
  • Задания для других пользователей не должны задерживаться из-за накопления заданий для конкретного пользователя.
  • Каждое задание должно быть выполнено хотя бы один раз. Каждое задание будет иметь максимальное количество повторных попыток и повторно вставляется в очередь (или, возможно, задерживается) с задержкой в ​​случае сбоя.
  • Сохранение последовательности заданий (для каждого пользователя) желательно, но не обязательно.
  • Задания, вероятно, следует сохранять, так как мне нужно, чтобы они выполнялись хотя бы один раз. У вакансий нет срока годности.
  • Любой из рабочих должен иметь возможность запускать задания для любого пользователя.

С учетом этих требований, я думаю, имеет смысл поддерживать единую очередь для каждого отдельного пользователя. Мне также понадобятся все рабочие, наблюдающие за всеми очередями пользователей и выполняющие задание для пользователя, чье задание в настоящее время нигде не выполняется (т.е. не более 1 задания на пользователя)

Будет ли это решение работать с использованием RabbitMQ в настройке кластера? Поскольку количество очередей будет большим, я не уверен, что каждый работник, наблюдающий за каждой пользовательской очередью, вызовет значительные накладные расходы или нет. Любая помощь приветствуется.


person Nands    schedule 08.06.2014    source источник
comment
Можете ли вы дать немного больше контекста? Например: как задания отправляются в очередь? насколько они долговечны? насколько важно, чтобы они надежно сохранялись?   -  person flup    schedule 06.08.2014
comment
да, здесь нужно больше контекста. Если у вас есть несколько потребителей, прослушивающих каждую пользовательскую очередь, я не представляю себе простого способа обеспечить выполнение только одного задания для каждого пользователя в каждый момент времени, не прибегая к хакерству или необходимости реализовывать собственный механизм блокировки.   -  person dectarin    schedule 06.08.2014
comment
Я отредактировал вопрос, чтобы сделать его более понятным. Любая помощь приветствуется, спасибо   -  person Nands    schedule 08.08.2014


Ответы (1)


Как упомянул @dectarin, если несколько рабочих прослушивают несколько очередей заданий, будет сложно гарантировать, что выполняется только одно задание для каждого пользователя.

Я думаю, что было бы лучше, если бы задания проходили через пару шагов.

  1. Пользователь отправляет задание
  2. Задание ставится в очередь для каждого пользователя до тех пор, пока не перестанут выполняться другие задания.
  3. Координатор помещает задание в активную очередь заданий, которая потребляется рабочими.
  4. Работник берет работу и выполняет ее
  5. Рабочий отправляет результаты в очередь результатов
  6. Результаты отправляются пользователю

Я не знаю, как задания отправляются в систему, поэтому трудно сказать, будут ли фактические MessageQueues для каждого пользователя лучшим способом поставить ожидание в очередь. Например, если задания уже находятся в почтовом ящике, это тоже может сработать. Или сохраните задания в очереди в базе данных в качестве бонуса, который позволит вам написать небольшой интерфейс для пользователей, чтобы проверять и управлять своими заданиями в очереди.

В зависимости от того, что вы выберете, вы сможете найти элегантный способ координировать одно задание для ограничения пользователя.

Например, если задания находятся в базе данных, база данных поддерживает синхронизацию, и несколько рабочих-координаторов могут пройти следующий цикл:

while( true ) {
    if incoming job present for any user {
        pick up first job from queue
        put job in database, marking it active if no other active job is present
        if job was marked active {
            put job on active job queue
        }
    }
    if result is present for any user {
        pick up first result from result queue
        send results to user
        mark job as done in database
        if this user has job waiting in database, mark it as active
        if job was marked active {
            put job on active job queue
        }
    }
}

Или, если ожидающие задания находятся в очередях сообщений для каждого пользователя, транзакции будут проще, и одному координатору, проходящему через цикл, не нужно будет беспокоиться о многопоточности.

Сделать вещи полностью транзакционными в базе данных и очередях может быть сложно, но это не обязательно. Вводя состояние ожидания, вы должны позволить себе ошибаться из соображений осторожности, чтобы убедиться, что никакие задания не будут потеряны в случае сбоя шага.

person flup    schedule 08.08.2014