Rethinkdb — атомарная строка извлечения/обновления с определенным условием

У меня есть куча работников сельдерея, которым нужно получить одну запись (строку) из базы данных.

Он должен быть возвращен только в том случае, если с момента поля «last_used» прошло 60 секунд, и как только он будет возвращен, он должен обновить «last_used» с текущим временем (поэтому другие рабочие процессы, которые имеют неограниченное количество повторных попыток с задержкой 0, не получают тот же самый)

Возможно ли сделать все это на уровне db? У меня не было бы другого источника, обновляющего это поле «last_used».

Вот как выглядит один ряд.

{"id": "..." "last_used": "2016-06-31 00:37:21.241833", "item": "string"}

Я попытался:

conn = r.connect(host='localhost', port=28015)
do = r.db('database').table('tb').order_by(index=("last_used")).limit(1).update(
{'last_used': r.now()}
, return_changes=True).run(conn)

И это не работает, несколько рабочих возвращают одну и ту же строку до ее изменения.


person Nema Ga    schedule 26.07.2016    source источник


Ответы (1)


Я думаю, что я сделал это, попытался запустить против него 50 рабочих с 0 конфликтами. Но я все еще новичок в rethinkdb и хотел бы услышать ваши мысли по этому поводу, заимствованные отсюда.

@app.task(bind=True, default_retry_delay=0, max_retries=999)
def ss(self):        
    conn = r.connect(host='localhost', port=28015)
    do = r.db('').table('').order_by("last_used").filter(
    r.now() - r.row['last_used'] > 10
    ).filter({'status': 1}).limit(1).update(
    r.branch(r.row["status"] == 1, {'status': 2, "last_used": r.now()}, {}),
    return_changes=True).run(conn)


    try:
        got_item = do['changes'][0]['new_val']['id']
        last_used = do['changes'][0]['new_val']['last_used']
    except:
        # print('error', do)
        raise self.retry()

    if got_item:
        # Do stuff that required unique row here....
        print(got_item,'\n',last_used)
        time.sleep(random.randrange(1,5))
        r.db('').table('').get(got_item).update({"status": 1}).run(conn)

#start workers
for i in range(50):
    ss.delay()

Я надеюсь, что при этом воркеры гарантированно будут работать с уникальным элементом внутри блока «if got_item».

person Nema Ga    schedule 27.07.2016