планирование задачи на несколько таймингов (с разными параметрами) с использованием сельдерея, но задача запускается только один раз (со случайными параметрами)

Чего я пытаюсь достичь. Напишите планировщик, который использует базу данных для планирования аналогичных задач в разное время.

Так же, как я использую сельдерей, приведенный ниже фрагмент кода даст представление

try:
    reader = MongoReader()
except:
    raise
try:
    tasks = reader.get_scheduled_tasks()
except:
    raise
celerybeat_schedule = dict()
for task in tasks:
    celerybeat_schedule[task["task_id"]] =dict()
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"]
    celerybeat_schedule[task["task_id"]]["args"] = (task,)
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task)

app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule)

Итак, это три шага - чтение всех задач из хранилища данных - создание словаря, планировщик сельдерея, который заполняется всеми задачами, имеющими свойства, имя_задачи (метод, который будет запускаться), параметры (данные для передачи методу), расписание (сохраняет, когда run) - обновление с помощью конфигураций сельдерея

Ожидаемый сценарий, учитывая, что все записи запускают одно и то же имя задачи сельдерея, которое просто печатает, имеют одно и то же расписание, которое запускается каждые 5 минут, с разными параметрами, указывающими, что печатать, скажем, у db есть

task name     , parameter , schedule
regular_print , Hi        , {"minutes" : 5}
regular_print , Hello        , {"minutes" : 5}
regular_print , Bye        , {"minutes" : 5}

Я ожидаю, что они будут печататься каждые 5 минут, чтобы напечатать все три

Что происходит Только одна из распечаток Hi, Hello, Bye (возможно, случайно, но не последовательно)

Пожалуйста, помогите, заранее большое спасибо :)


person mayank    schedule 09.11.2016    source источник


Ответы (1)


Удалось решить эту проблему с помощью сельдерея версии 4. Пример, аналогичный тому, что сработал для меня .. также можно найти в документации по сельдерею для версии 4

    #taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"]
    ex_port_queue = os.environ["EX_PORT_QUEUE"]
    ex_user_queue = os.environ["EX_USERID_QUEUE"]
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"]
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//"

    #celery initialization
    app = Celery(__name__,backend=broker, broker=broker)
    app.conf.task_default_queue = 'scheduler_queue'
    app.conf.update(
        task_serializer='json',
        accept_content=['json'],  # Ignore other content
        result_serializer='json'
    )
task = {"task_id":1,"a":10,"b":20}
##method to update scheduler
def add_scheduled_task(task):
    print("scheduling task")
    del task["_id"]
    print("adding task_id")
    name = task["task_name"]
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])    

@app.task(name='scheduler_task')
def scheduler_task(data):
    print(str(data["a"]+data["b"]))
person mayank    schedule 10.02.2017