Я создаю задачу (путем создания подкласса celery.task.Task), которая создает соединение с потоковым API Twitter. Для вызовов Twitter API я использую tweepy. Как я прочитал из документации celery, «задача не создается для каждого запроса, а регистрируется в реестре задач как глобальный экземпляр». Я ожидал, что всякий раз, когда я вызываю apply_async (или задержку) для задачи, я буду обращаться к задаче, которая была изначально создана, но этого не происходит. Вместо этого создается новый экземпляр пользовательского класса задач. Мне нужно иметь доступ к исходной настраиваемой задаче, так как это единственный способ разорвать исходное соединение, созданное tweepy вызовом API.
Вот кусок кода, если это поможет:
from celery import registry
from celery.task import Task
class FollowAllTwitterIDs(Task):
def __init__(self):
# requirements for creation of the customstream
# goes here. The CustomStream class is a subclass
# of tweepy.streaming.Stream class
self._customstream = CustomStream(*args, **kwargs)
@property
def customstream(self):
if self._customstream:
# terminate existing connection to Twitter
self._customstream.running = False
self._customstream = CustomStream(*args, **kwargs)
def run(self):
self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()
self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]
И для представления Джанго
def connect_to_twitter(request):
if request.method == 'POST':
do_stuff_here()
.
.
.
follow_all_twitterids.apply_async(args=[], kwargs={})
return
Любая помощь будет оценена по достоинству. :D
РЕДАКТИРОВАТЬ:
Для дополнительного контекста вопроса объект CustomStream создает экземпляр httplib.HTTPSConnection всякий раз, когда вызывается метод filter(). Это соединение должно быть закрыто всякий раз, когда есть еще одна попытка создать его. Соединение закрывается установкой для customstream.running значения False.