Поведение холста сельдерея различается в асинхронном и нетерпеливом режимах

Существуют некоторые расхождения в том, как холст Celery работает в асинхронном и нетерпеливом режимах. Я заметил, что группа, за которой следует цепочка в динамической задаче, которая заменяет себя, не отправляет результаты следующей в цепочке.

Ну, это кажется сложным, позвольте мне показать пример:

Дана следующая задача:

@shared_task(bind=True)
def grouped(self, val):
    task = (
        group(asum.s(val, n) for n in range(val)) | asum.s(val)
    )
    raise self.replace(task)

когда он сгруппирован на другом холсте следующим образом:

@shared_task(bind=True)
def flow(self, val):
    workflow = (asum.s(1, val) |
                asum.s(2) |
                grouped.s() |
                amul.s(3))

    return self.replace(workflow)

задача amul не будет получать результаты группировки, когда она находится в активном режиме.

Чтобы действительно проиллюстрировать проблему, я создал пример проекта на github, где вы можете погрузиться в проблему и помочь мне с некоторыми быстрыми решениями и, возможно, с некоторыми PR в проекте celery.

https://github.com/gutomaia/celery_equation

---- отредактировано ----

В проекте я заявляю о различном поведении при обоих способах использования сельдерея. В асинхронном режиме домашние задачи работают как положено.

>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895

person gutomaia    schedule 11.07.2019    source источник


Ответы (3)


К сожалению, нетерпеливый режим никогда не будет таким же, как запуск реального работника. Слишком много сложностей при запуске реального работника, чтобы нетерпеливый режим был точно таким же. Я согласен с тем, что подобные вещи должны относиться к особым случаям при использовании активного режима, но ожидается некоторое несоответствие. Пожалуйста, отправьте PR, если вы знаете, как решить эту проблему, и мы можем просмотреть исправление там. Благодарю вас!

person xirdneh    schedule 12.07.2019

Я боролся с этой ситуацией в тестовом примере. Для будущих читателей, по крайней мере, начиная с celery 4.4.0, следующая идиома будет работать во всех контекстах, включая синхронное выполнение в процессе:

    return self.replace(...)

Использование raise или просто завершение функции сразу после Task.replace будет работать только в асинхронном режиме. Соответствующий код в самом конце. из Task.replace:

        if self.request.is_eager:
            return sig.apply().get()
        else:
            sig.delay()
            raise Ignore('Replaced by new task')
person Rogério Gatto    schedule 04.02.2020

grouped() ничего не возвращает, так как же вы ожидаете, что amul получит результат??

person DejanLekic    schedule 12.07.2019
comment
Синтаксис сахара self.replace можно использовать в обоих контекстах. Не имеет значения, вызывается ли замена как возвращаемое значение или повышение. На проекте по прикрепленной ссылке видно, что в асинхронном режиме задачи работают. В вопросе указывается разница между поведением нетерпеливого и асинхронного режима. Чего я ожидал, так это небольшого исправления для обезьяны, которое решит этот конкретный случай. - person gutomaia; 12.07.2019