Существуют некоторые расхождения в том, как холст Celery работает в асинхронном и нетерпеливом режимах. Я заметил, что группа, за которой следует цепочка в динамической задаче, которая заменяет себя, не отправляет результаты следующей в цепочке.
Ну, это кажется сложным, позвольте мне показать пример:
Дана следующая задача:
@shared_task(bind=True)
def grouped(self, val):
task = (
group(asum.s(val, n) for n in range(val)) | asum.s(val)
)
raise self.replace(task)
когда он сгруппирован на другом холсте следующим образом:
@shared_task(bind=True)
def flow(self, val):
workflow = (asum.s(1, val) |
asum.s(2) |
grouped.s() |
amul.s(3))
return self.replace(workflow)
задача amul не будет получать результаты группировки, когда она находится в активном режиме.
Чтобы действительно проиллюстрировать проблему, я создал пример проекта на github, где вы можете погрузиться в проблему и помочь мне с некоторыми быстрыми решениями и, возможно, с некоторыми PR в проекте celery.
https://github.com/gutomaia/celery_equation
---- отредактировано ----
В проекте я заявляю о различном поведении при обоих способах использования сельдерея. В асинхронном режиме домашние задачи работают как положено.
>>> from equation.main import *
>>> from equation.tasks import *
>>> flow.delay(1).get()
78
>>> flow.delay(2).get()
120
>>> flow.delay(100).get()
47895