В моем коде есть две гипотетические задачи: одна получает URL-адреса от генератора и загружает их в пакетном режиме с помощью Twisted Cooperator, а другая берет загруженный источник и асинхронно анализирует его. Я пытаюсь инкапсулировать все задачи выборки и синтаксического анализа в один объект Deferred, который выполняет обратный вызов, когда все страницы загружены и все источники проанализированы.
Я придумал следующее решение:
from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage
BATCH_SIZE = 5
def main_task():
result = defer.Deferred()
state = {'count': 0, 'done': False}
def on_parse_finish(r):
state['count'] -= 1
if state['done'] and state['count'] == 0:
result.callback(True)
def process(source):
deferred = parse(source)
state['count'] += 1
deferred.addCallback(on_parse_finish)
def fetch_urls():
for url in get_urls():
deferred = getPage(url)
deferred.addCallback(process)
yield deferred
def on_finish(r):
state['done'] = True
deferreds = []
coop = task.Cooperator()
urls = fetch_urls()
for _ in xrange(BATCH_SIZE):
deferreds.append(coop.coiterate(urls))
main_tasks = defer.DeferredList(deferreds)
main_tasks.addCallback(on_finish)
return defer.DeferredList([main_tasks, result])
# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)
Код работает, но я чувствую, что либо упускаю что-то явно очевидное, либо не знаю простого шаблона Twisted, который сделал бы это намного проще. Есть ли лучший способ вернуть один Deferred, который перезванивает, когда все выборки и синтаксический анализ завершены?
Undefined name 'parse'
,Undefined name 'get_urls'
,Undefined name 'task_finished'
. Гораздо проще убедиться, что ответы на вопрос верны, если пример кода в вопросе действительно работает :). - person Glyph   schedule 04.12.2013