Celery создает несколько экземпляров Task

Я создаю задачу (путем создания подкласса celery.task.Task), которая создает соединение с потоковым API Twitter. Для вызовов Twitter API я использую tweepy. Как я прочитал из документации celery, «задача не создается для каждого запроса, а регистрируется в реестре задач как глобальный экземпляр». Я ожидал, что всякий раз, когда я вызываю apply_async (или задержку) для задачи, я буду обращаться к задаче, которая была изначально создана, но этого не происходит. Вместо этого создается новый экземпляр пользовательского класса задач. Мне нужно иметь доступ к исходной настраиваемой задаче, так как это единственный способ разорвать исходное соединение, созданное tweepy вызовом API.

Вот кусок кода, если это поможет:

from celery import registry
from celery.task import Task

class FollowAllTwitterIDs(Task):
    def __init__(self):
        # requirements for creation of the customstream
        # goes here. The CustomStream class is a subclass
        # of tweepy.streaming.Stream class

        self._customstream = CustomStream(*args, **kwargs)

    @property
    def customstream(self):
        if self._customstream:
            # terminate existing connection to Twitter
            self._customstream.running = False
        self._customstream = CustomStream(*args, **kwargs)

    def run(self):
        self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()

        self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]

И для представления Джанго

def connect_to_twitter(request):
    if request.method == 'POST':
        do_stuff_here()
        .
        .
        .

        follow_all_twitterids.apply_async(args=[], kwargs={})

     return

Любая помощь будет оценена по достоинству. :D

РЕДАКТИРОВАТЬ:

Для дополнительного контекста вопроса объект CustomStream создает экземпляр httplib.HTTPSConnection всякий раз, когда вызывается метод filter(). Это соединение должно быть закрыто всякий раз, когда есть еще одна попытка создать его. Соединение закрывается установкой для customstream.running значения False.


person Christian    schedule 25.10.2011    source источник


Ответы (1)


Задача должна быть создана только один раз, если вы думаете, что это не по какой-то причине, я предлагаю вам добавить

print("INSTANTIATE") импортировать трассировку traceback.print_stack()

к методу Task.__init__, чтобы вы могли сказать, где это будет происходить.

Я думаю, что вашу задачу можно было бы лучше выразить так:

from celery.task import Task, task

class TwitterTask(Task):
    _stream = None
    abstract = True

    def __call__(self, *args, **kwargs):
        try:
            return super(TwitterTask, self).__call__(stream, *args, **kwargs)
        finally:
            if self._stream:
                self._stream.running = False

    @property
    def stream(self):
        if self._stream is None:
            self._stream = CustomStream()
        return self._stream

@task(base=TwitterTask)
def follow_all_ids():
    ids = get_list_of_ids_to_follow()
    follow_all_ids.stream.filter(follow=ids, async=false)
person asksol    schedule 26.10.2011
comment
Спасибо за ответ. Я попытался реализовать вышеизложенное, и мне просто интересно, где используется свойство celery.utils.cached_property? - person Christian; 27.10.2011
comment
Просто его добавили туда по ошибке :) Я его удалю - person asksol; 06.11.2011