Celery + SQS получает одну и ту же задачу дважды с одним и тем же идентификатором задачи в одно и то же время

используя celery с SQS в приложении flask
, но celery получает одну и ту же задачу дважды с одним и тем же идентификатором задачи в одно и то же время,

запускает worker таким образом,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8

вот журналы сельдерея

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp

одна и та же задача получена дважды, и обе выполняются одновременно,

используя celery==4.4.0rc4 , boto3==1.9.232, kombu==4.6.6 с SQS в колбе pyhton.
В SQS время ожидания видимости по умолчанию составляет 30 минут, и моя задача не имеет ETA и не ack

моя задача.py

from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)

class BookingTasks:
    def addBookingToTask(self):
        request_data = request.json
        print ('in addBookingToTask',request_data['request_id'])
        print (request_data)
        bookFlightTask.delay(request_data)
        return 'addBookingToTask added'

@capp.task(max_retries=0)
def bookFlightTask(request_data):
    task_id = capp.current_task.request.id
    try:
        print ('in booking funtion1')
        ----

мой файл конфигурации, config.py

import os
from urllib.parse import quote_plus

aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)


## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')

и, наконец, мой файл приложения сельдерея, run.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()

capp.config_from_object('app.jobs.config')

# Optional configuration, see the capplication user guide.
capp.conf.update(
    result_expires=3600,
)
 
# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
    capp.start()

person ssnitish    schedule 01.12.2019    source источник
comment
Какой часовой пояс?   -  person 2ps    schedule 02.12.2019
comment
Я нигде не определял часовой пояс явно. по умолчанию на сельдерее это UTC.   -  person ssnitish    schedule 02.12.2019
comment
любая помощь, понимание было бы здорово.   -  person ssnitish    schedule 02.12.2019


Ответы (1)


По умолчанию значение параметра Visiblity_timeout для SQS равно 30 с. Вам необходимо обновить значение конфигурации сельдерея: broker_transport_options={'visibility_timeout': 3600}.

Когда сельдерей переходит к созданию очереди, он устанавливает время ожидания видимости на 1 час.

ПРИМЕЧАНИЕ. Если вы укажете task_default_queue, а очередь уже была создана без указания broker_transport_options={'visibility_timeout': 3600}, сельдерей не будет обновлять время ожидания видимости при перезапуске с broker_transport_options={'visibility_timeout': 3600}. Вам нужно будет удалить очередь, и сельдерей воссоздаст ее.

person Aaron J    schedule 29.04.2020
comment
Объясните, как вы поняли, что стандартного visibility_timeout (30 секунд) недостаточно и его нужно увеличить? - person Aliaksandr Adzinets; 14.01.2021