Аргументы ошибки Celery Result должны быть списком или кортежем

Я запускаю веб-сайт Django и только что запустил Celery, но получаю запутанные ошибки. Вот как устроен код.

В тестах.py:

from tasks import *
from celery.result import AsyncResult

project = Project.objects.create()
# initalize various sub-objects of the project

c = function.delay(project.id)
r = AsyncResult(c.id).ready()
f = AsyncResult(c.id).failed()
# wait until the task is done  
while not r and not f:
    r = AsyncResult(c.id).ready()
    f = AsyncResult(c.id).failed()

self.assertEqual() #will fail because task fails

в tasks.py:

from __future__ import absolute_import
from celery import shared_task

@shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)

    for part in project.part_set.all():
        partFunction(part.id)
        p = Part.objects.get(id=part.id)
        # add to various variables in project from variables in p
    project.save()

в mainapp/settings.py:

BROKER_URL = "amqp://ipaddress"
CELERY_RESULT_BACKEND='amqp'
CELERY_ACCEPT_CONTENT = ['json','pickle','msgpack','yaml']
CELERY_IGNORE_RESULT = False

журнал консоли отладки сельдерея для обязательного списка/кортежа:

[INFO/MainProcess] Received task: myapp.tasks.function[id]
[ERROR/MainProcess] Task myapp.tasks.function[id]
    raised unexpected: ValueError('task args must be a list or tuple',)
Traceback:
   File "/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
       R = retval = fun(*args, **kwargs)
   File "/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
       return self.run(*args, **kwargs)
   File "/myapp/tasks.py", line 28, in function
       p = Part.objects.get(id=part.id)
   File "/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
       **dict(self._get_exec_options(), **options)
   File "/python2.7/site-packages/celery/app/base.py", line 351, in send_task
       reply_to=reply_to or self.oid, **options
   File "celery/app/amqp.py", line 252, in publish_task
       raise ValueError('task args must be a list or tuple')
ValueError: task args must be a list or tuple

ошибка, которую я получаю, как указано выше, AsyncResult(c.id).result: task args must be a list or tuple. Это должно быть простым решением, но это не так. Когда я составляю список так:

inline = [project.id]
c = function.delay(inline)

Затем он передумал и сказал мне, что AsyncResult(c.id).result: int() argument must be a string or a number, not 'list'

Как вы можете себе представить, я очень смущен тем, в чем может быть проблема.


Редактировать

задачи.py

@shared_task
def function(app):
    @app.task(name='myapp.function', bind=True)
    def function(project_id):

тесты.py

c = function.s(project.id).delay()

function.app печатает


person DoctorWizard    schedule 17.06.2014    source источник
comment
можете ли вы поделиться еще немного кода?   -  person Rafael Barros    schedule 17.06.2014
comment
@RafaelBarros Я не могу поделиться с большей частью кода задачи, но это не должно иметь значения, поскольку я получаю те же результаты, когда процесс ничего не делает. Я могу добавить некоторые настройки и некоторые другие части тестовой функции, если это может помочь.   -  person DoctorWizard    schedule 17.06.2014
comment
если вы можете отправить хотя бы несколько строк функции и, возможно, полную трассировку стека, это поможет.   -  person Rafael Barros    schedule 17.06.2014
comment
@RafaelBarros, если вы не понимаете, что такое любой из путей, дайте мне знать, потому что я сделал их сжатыми для удобства и разборчивости.   -  person DoctorWizard    schedule 18.06.2014
comment
можете попробовать сделать function.delay(*inline)?   -  person Rafael Barros    schedule 18.06.2014
comment
@RafaelBarros, к сожалению, дал мне ту же ошибку, что и указанная выше.   -  person DoctorWizard    schedule 18.06.2014
comment
Да, это будет невозможно отладить без кода внутри функции.   -  person Rafael Barros    schedule 18.06.2014
comment
@RafaelBarros Я могу добавить некоторый код, но я не думаю, что проблема связана с кодом, поскольку это та же ошибка, когда функция ничего не делает.   -  person DoctorWizard    schedule 18.06.2014
comment
но если вы вызываете задачу, как указано здесь: celery.readthedocs.org/en/latest/getting-started/ проблема не в сельдерее.   -  person Rafael Barros    schedule 18.06.2014
comment
поскольку вы используете shared_task, вам может потребоваться указать, к какому приложению привязана эта задача. что внутри function.app?   -  person Rafael Barros    schedule 18.06.2014
comment
@RafaelBarros Я не уверен, что вы имеете в виду насчет function.app, хотя я добавил немного кода в функцию, и не следует ли мне использовать shared_task, и как мне убедиться, что задача связана с задачей в myapp?   -  person DoctorWizard    schedule 18.06.2014
comment
информации, которую вы там разместили, достаточно, чтобы у меня было несколько идей о том, что происходит. распечатав function.app, вы можете показать, какое приложение celery привязано к этой функции. Вы можете увидеть правильную реализацию общих задач здесь: github.com/celery/celery/issues/1937   -  person Rafael Barros    schedule 18.06.2014
comment
@RafaelBarros Извините за задержку ответа. Я изменился на способ, указанный в вашей ссылке, как вы можете видеть выше. Проблема, с которой я сейчас сталкиваюсь, заключается в ошибке атрибута app.task, где объект «int» не имеет атрибута «задача».   -  person DoctorWizard    schedule 18.06.2014
comment
Я считаю, что исключение возникает при сериализации результата задачи для отправки обратно в ваш тест (текст исключения может вводить в заблуждение, вместо этого я рекомендую вам прочитать код). Возможно, вам просто нужно, чтобы ваша функция задачи возвращала результат?   -  person Robert Jørgensgaard Engdahl    schedule 18.06.2014


Ответы (1)


Вы получаете ошибку в своем коде внутри задачи, она отображается в трассировке:

File "/myapp/tasks.py", line 28, in function
   p = Part.objects.get(id=part.id)

Ваш код кажется правильным, но из трассировки похоже, что у сельдерея маринована старая версия задачи. Очень важно перезапускать celery всякий раз, когда вы меняете что-либо внутри task.py (может быть, даже если вы изменяете другие файлы, но я так не думаю). Это может быть причиной вашей проблемы, он пару раз укусил меня в зад.

Также нет причин извлекать part из базы данных по отдельности p = Part.objects.get(id=part.id), поскольку вы уже получаете текущий экземпляр детали, когда перебираете набор запросов в for part in project.part_set.all():. Это всего лишь предложение, у вас может быть больше кода, который требует этого шага.

В качестве примечания: если это задача в вашем проекте, а не часть какого-либо многократно используемого приложения, просто используйте обычный декоратор @task, сельдерей найдет его, но убедитесь, что вы правильно настроили приложение Celery, вот еще один из моих постов. который может помочь вам: запустить несколько раз

Итак, если у вас все настроено правильно, просто используйте его, как и раньше:

@task #or @shared_task
def function(project_id)
    #a bunch of calculations followed by a save of the project
    project = Project.objects.get(project=project_id)
    ....

Затем назовите это:

result = function.delay(project.id)

or:

result = function.apply_async(args=(project.id,))

Очевидно, я также рекомендовал бы протестировать задачу напрямую, вызвав ее без celery function(project.id), но я уверен, что вы это знали.

person lehins    schedule 18.06.2014
comment
Спасибо, проблема была не в коде, который у меня все еще был, поскольку он был прав, но сельдерей думал, что я использую более старую версию tasks.py. - person DoctorWizard; 20.06.2014