Как связать приложение сельдерея с классом Task?

Я хочу переопределить класс Task Celery. Я мог бы переопределить методы on_success и on_failure, но метод run для меня не так прост. Я попытался использовать метод bind. Мой код выглядит следующим образом:

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def bind(self, app):
        return super(self.__class__, self).bind(app)

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        x = x**2


if __name__=="__main__":
     mytask = MyTask()
     app = Celery('mytask', backend='redis', broker='redis://localhost')
     mytask.bind(app)
     job = mytask.apply_async(data = 1)

но когда я запускаю код, я получаю следующую ошибку:

Received unregistered task of type None.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: None

Я много искал, но я не получил никакого результата. Как зарегистрировать задачу?


person Rohola Zandie    schedule 14.05.2017    source источник


Ответы (1)


Явная привязка приложения не требуется, так как задача celery автоматически привязывается к current_app при вызове apply_async. Если вы хотите явную привязку, доступны два способа: 1. task.bind(app) 2. создать класс Task из app.Task. Унаследовав app.Task, это приложение будет привязано.

Ваша проблема не в привязке. Речь идет о реестре задач. Исправьте это, как показано ниже:

from celery import Celery, Task

class MyTask(Task):
    name = 'mytask'

    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        print(x**2)


if __name__ == "__main__":
    app = Celery('mytask', backend='redis', broker='redis://localhost')
    MyTask.bind(app)
    app.tasks.register(MyTask)
    app.worker_main()

Надеюсь, это полезно.

person Luo Yufu    schedule 29.09.2017