Параллелизм в распределенной очереди задач (производитель/потребитель)

Мое приложение (Java) случайным образом создает некоторые задачи и асинхронно потребляет распределенные фоновые потоки.

В настоящее время у меня нет решения для распределенной блокировки, такого как ZooKeeper. У меня нет сторонних очередей сообщений.

Я использую базу данных в качестве очереди задач, и потребляемые результаты также сохраняются в базе данных, к которой имеют общий доступ все потребители/производители.

У меня есть такой код:

Потребитель:

while(true) {
  // block the thread and wait from producer's notify
  // my producers would produce MANY work items but only notify each consumer ONCE.
  waitProducer();

  // consume the queue
  while(database.queueNotEmpty()) {
    // consume each work item and remove from database queue
    consumeAll();
  }
}

Режиссер:

for(...) {
  database.enqueue(work[i]);
}
// notify all consumers
notifyAllConsumer();

По-видимому, приведенный выше код имеет одновременные ошибки. У меня есть 3 вопроса:

1. Как избежать того, чтобы распределенные потребители выполняли одну и ту же задачу? (о строке: «consumeAll()») или уменьшить дублированные вычисления. многократное использование одной задачи не будет ошибкой, но в моем случае менее эффективно.

2. Как избежать того, чтобы очередь НЕ была пустой, но потребитель не был активен? последовательность будет следующей: один потребитель и один образец производителя:

  • Потребитель: while(database.queueNotEmpty()) // очередь пуста, прерываем цикл while
  • Производитель: database.enqueue(work[i]); // создать задачу
  • Производитель: notifyAllConsumer(); // уведомляем потребителя, но он уже активен
  • Потребитель: waitProducer(); // приостановить поток, но еще есть работа

3. Любая лучшая практика для этой проблемы? особенно в чистой java. Нужна ли сторонняя очередь сообщений или что-то вроде zookeeper? Меньше блокировки или отсутствие блокировки предпочтительнее; в моем случае эффективность предпочтительнее правильности.

Спасибо!


person marstone    schedule 27.05.2014    source источник
comment
Почему бы вам не использовать BlockingQueue? Они предназначены для такой работы   -  person fge    schedule 27.05.2014
comment
@fge BlockingQueue работает только в одной JavaVM? мои производители были бы на разных серверах.   -  person marstone    schedule 27.05.2014
comment
Что ж, ничто не мешает вам использовать его в качестве промежуточного звена между вашими производителями и потребителями; есть также такие решения, как Terracotta, которые позволяют совместно использовать POJO (с блокировкой и всем остальным) на нескольких виртуальных машинах.   -  person fge    schedule 27.05.2014
comment
Как вы реализуете waitProducer(), consumeAll() и notifyAllConsumer()? Кроме того, вам лучше использовать любой из доступных MQ, готовых к производству.   -  person pingw33n    schedule 27.05.2014
comment
@ pingw33n О трех реализациях не может быть и речи. Если мы предположим, что 3 метода реализованы правильно, мои проблемы все еще существуют.   -  person marstone    schedule 27.05.2014
comment
На самом деле, если они реализованы правильно, они, например, гарантируют, что операция выборки является атомарной, и никакие две выборки не вернут одно и то же сообщение. Они же применимы и для других методов. Ваши проблемы должны решаться внутри этих методов.   -  person pingw33n    schedule 27.05.2014
comment
@fge Спасибо! Я проверю Терракоту позже. Однако задачи в моем случае должны быть постоянными, только памяти не хватает при перезапуске серверов.   -  person marstone    schedule 27.05.2014
comment
@ pingw33n, даже если я могу обеспечить все функции выше атомарных, вторая проблема все еще существует? на самом деле, ConsumerAll был бы очень сложным и трудоемким (поэтому он находится в асинхронных потоках) и на практике не будет атомарным.   -  person marstone    schedule 27.05.2014
comment
У вас есть 2 варианта, первый синхронизируется на уровне базы данных/центрального сервера, в вашем случае вы можете выполнить атомарное удаление элемента или записать любой грязный бит, чтобы сигнализировать о потребленном элементе, второй вариант - напрямую синхронизировать узлы\серверы через сообщение прохождение   -  person shaydel    schedule 31.05.2014
comment
@shaydel спасибо. синхронизация на уровне БД - это вариант, но не подходит для моего случая. какие-либо рекомендации относительно более позднего варианта для веб-приложения Java?   -  person marstone    schedule 02.06.2014


Ответы (1)


Я предлагаю вам использовать LinkedBlockingQueue в таком случае.

руководство по LinkedBlockingQueue

Вы можете использовать методы take()/put(), и если вы хотите подождать с ограничением по времени, вы можете использовать offer(), poll() и peek().

Я также использовал это в подобной проблеме.

person Jatin Malwal    schedule 01.08.2014