Очередь заданий в виде таблицы SQL с несколькими потребителями (PostgreSQL)

У меня типичная проблема производитель-потребитель:

Несколько приложений-производителей записывают запросы на работу в таблицу вакансий в базе данных PostgreSQL.

Запросы на работу имеют поле состояния, которое при создании содержит "Очередь".

Есть несколько потребительских приложений, которые получают уведомление с помощью правила, когда производитель вставляет новую запись:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

Они попытаются зарезервировать новую запись, установив для нее состояние RESERVED. Конечно, только один потребитель должен добиться успеха. Все остальные потребители не должны иметь возможность резервировать ту же запись. Вместо этого они должны зарезервировать другие записи с состоянием = QUEUED.

Пример: какой-то производитель добавил в таблицу jobrecord следующие записи:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

теперь два потребителя A, B хотят их обработать. Они начинают бегать одновременно. Один должен зарезервировать id 1, другой должен зарезервировать id 2, затем первый, кто закончит, должен зарезервировать id 3 и так далее.

В чистом многопоточном мире я бы использовал мьютекс для управления доступом к очереди заданий, но потребителями являются разные процессы, которые могут выполняться на разных машинах. Они обращаются только к одной и той же базе данных, поэтому вся синхронизация должна происходить через базу данных.

Я прочитал много документации о параллельном доступе и блокировке в PostgreSQL, например. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Выбрать разблокированную строку в Postgresql PostgreSQL и блокировка

Из этих тем я узнал, что следующий оператор SQL должен делать то, что мне нужно:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

К сожалению, когда я запускаю это в нескольких потребительских процессах, примерно в 50% случаев они по-прежнему резервируют одну и ту же запись, обрабатывая ее и перезаписывая изменения другой.

Что мне не хватает? Как мне написать инструкцию SQL, чтобы несколько потребителей не зарезервировали одну и ту же запись?


person code_talker    schedule 28.06.2011    source источник
comment
вы делаете это в сделке, не так ли? с BEGIN и COMMIT?   -  person regilero    schedule 28.06.2011
comment
Нет, без BEGIN и COMMIT. Я пытался сделать это в транзакции, но потом ничего не происходит, мои потребители ничего не обрабатывают, таблица остается неизменной. Стоит ли использовать транзакцию?   -  person code_talker    schedule 29.06.2011
comment
Похоже, что проблема с транзакцией заключалась в том, как я использовал ее в своем потребительском приложении (написанном на Qt).   -  person code_talker    schedule 29.06.2011
comment
хорошо, видел ваш ответ ниже, возможно, вы могли бы внести правку в окончательный рабочий код, так как это довольно повторяющийся вопрос   -  person regilero    schedule 29.06.2011
comment
Если вы сделали это внутри транзакции, но ничего никогда не произошло, я предполагаю, что вы не зафиксировали транзакцию, поскольку в этом случае tx откатится.   -  person apinstein    schedule 15.07.2011


Ответы (7)


Прочтите мой пост здесь:

https://stackoverflow.com/a/6500830/32688

Если вы используете транзакцию и LOCK TABLE, у вас не будет проблем.

person jordani    schedule 28.06.2011
comment
Спасибо, добавление блокировки таблицы LOCK TABLE slots IN ACCESS EXCLUSIVE MODE; (и исправление моих операторов транзакций в Qt) было решением. Теперь он работает, как ожидалось. Спасибо! - person code_talker; 29.06.2011
comment
ACCESS EXCLUSIVE будет работать, однако имеет серьезные побочные эффекты. В частности, он будет заблокирован во время pg_dump, который использует ACCESS SHARE режим. Это означает, что ваша очередь будет заблокирована на время резервного копирования, в нашем случае это может быть несколько часов. - person apinstein; 15.07.2011

Я также использую postgres для очереди FIFO. Первоначально я использовал ACCESS EXCLUSIVE, который дает правильные результаты при высоком уровне параллелизма, но имеет неудачный эффект взаимоисключаемости с pg_dump, который получает блокировку ACCESS SHARE во время своего выполнения. Это приводит к тому, что моя функция next () блокируется на очень долгое время (продолжительность pg_dump). Это было неприемлемо, так как мы работаем круглосуточно, и покупателям не нравилось время простоя в очереди посреди ночи.

Я подумал, что должна быть менее строгая блокировка, которая по-прежнему будет безопасной для одновременного доступа и не блокируется во время работы pg_dump. Мой поиск привел меня к этому сообщению ТАК.

Затем я провел небольшое исследование.

Следующие режимы достаточны для функции очереди FIFO NEXT (), которая обновит статус задания с в очереди на работает без сбоев параллелизма, а также не блокирует pg_dump. :

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Запрос:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Результат выглядит так:

UPDATE 1
 job_id
--------
     98
(1 row)

Вот сценарий оболочки, который проверяет все различные режимы блокировки при высоком уровне параллелизма (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Здесь также находится код, если вы хотите отредактировать: https://gist.github.com/1083936

Я обновляю свое приложение, чтобы использовать ИСКЛЮЧИТЕЛЬНЫЙ режим, поскольку это наиболее ограничительный режим, который а) является правильным и б) не конфликтует с pg_dump. Я выбрал наиболее ограничительный, поскольку это кажется наименее рискованным с точки зрения изменения приложения с ACCESS EXCLUSIVE, не будучи сверхэкспертом по блокировке postgres.

Я чувствую себя довольно комфортно со своим испытательным стендом и с общими идеями, лежащими в основе ответа. Я надеюсь, что это поможет решить эту проблему другим.

person apinstein    schedule 15.07.2011
comment
Спасибо, это очень полезно. Я займусь этим сегодня. - person code_talker; 15.07.2011
comment
Я провел тонну дополнительных испытаний этого решения и на 100% убежден, что это правильный способ. Я развернул его в своей производственной системе (исключите из очереди в ЭКСКЛЮЗИВНОМ режиме). Наслаждаться! - person apinstein; 17.07.2011
comment
Блокировка таблицы не требуется, только блокировка строки: stackoverflow.com/a/30315387/492548 - person mackross; 19.05.2015

Для этого не нужно делать блокировку всей таблицы: \.

Блокировка строки, созданная с помощью for update, работает нормально.

См. https://gist.github.com/mackross/a49b72ad8d24f7cefc32, чтобы узнать об изменении, которое я внес в ответ Apinstein и подтвердил, что он еще работает.

Окончательный код

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
person mackross    schedule 19.05.2015
comment
Это решение явно удаляет транзакцию. Почему? - person Yitz; 25.09.2016
comment
pg неявно добавляет транзакцию вокруг оператора - person mackross; 26.09.2016
comment
@mackross Было бы лучше использовать for update skip locked вместо просто for update, потому что это не будет блокировать других одновременных читателей. Возможно, причина, по которой вы его не использовали, заключается в том, что оператор skip locked не был доступен в 2015 году. - person Frederick The Fool; 30.04.2021

как насчет просто выбора?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

person Vladimir Filipchenko    schedule 11.11.2016

Возможно, вы захотите посмотреть, как это делает queue_classic. https://github.com/ryandotsmith/queue_classic

Код довольно короткий и простой для понимания.

person Joe Van Dyk    schedule 01.10.2012

Хорошо, вот решение, которое работает для меня, на основе ссылки от jordani. Поскольку некоторые из моих проблем были связаны с работой Qt-SQL, я включил код Qt:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

Чтобы проверить, обрабатывают ли несколько потребителей одно и то же задание, я добавил правило и таблицу журнала:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Без оператора LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; таблица журнала иногда заполняется записями, если один потребитель перезаписал значения другого, но при использовании оператора LOCK таблица журнала остается пустой :-)

person code_talker    schedule 30.06.2011

Попробуйте PgQ вместо того, чтобы изобретать колесо.

person Sean    schedule 28.06.2011
comment
Я уже проверил PgQ, но, к сожалению, он отправит одно и то же событие всем потребителям: You can have as many consumers as you want to on the same event queue, but they will all see the same events rather than share the workload. Это именно то, чего я не хочу. Я бы предпочел использовать существующее решение, но не смог найти того, что удовлетворяет моим требованиям. - person code_talker; 29.06.2011
comment
Понимаете, я бы использовал PgQ, а затем попросил бы каждого потребителя зарегистрировать блокировку работы в другой таблице с помощью оператора UPDATE ... RETURNING. Пища для размышлений. - person Sean; 30.06.2011