Twisted: Ожидание завершения подзадач

В моем коде есть две гипотетические задачи: одна получает URL-адреса от генератора и загружает их в пакетном режиме с помощью Twisted Cooperator, а другая берет загруженный источник и асинхронно анализирует его. Я пытаюсь инкапсулировать все задачи выборки и синтаксического анализа в один объект Deferred, который выполняет обратный вызов, когда все страницы загружены и все источники проанализированы.

Я придумал следующее решение:

from twisted.internet import defer, task, reactor, threads
from twisted.web.client import getPage


BATCH_SIZE = 5

def main_task():
    result = defer.Deferred()
    state = {'count': 0, 'done': False}

    def on_parse_finish(r):
        state['count'] -= 1
        if state['done'] and state['count'] == 0:
            result.callback(True)

    def process(source):
        deferred = parse(source)
        state['count'] += 1
        deferred.addCallback(on_parse_finish)

    def fetch_urls():
        for url in get_urls():
            deferred = getPage(url)
            deferred.addCallback(process)
            yield deferred

    def on_finish(r):
        state['done'] = True

    deferreds = []

    coop = task.Cooperator()
    urls = fetch_urls()
    for _ in xrange(BATCH_SIZE):
        deferreds.append(coop.coiterate(urls))

    main_tasks = defer.DeferredList(deferreds)
    main_tasks.addCallback(on_finish)

    return defer.DeferredList([main_tasks, result])

# `main_task` is meant to be used with `blockingCallFromThread`
# The following should block until all fetch/parse tasks are completed:
# threads.blockingCallFromThread(reactor, main_task)

Код работает, но я чувствую, что либо упускаю что-то явно очевидное, либо не знаю простого шаблона Twisted, который сделал бы это намного проще. Есть ли лучший способ вернуть один Deferred, который перезванивает, когда все выборки и синтаксический анализ завершены?


person enderskill    schedule 02.12.2013    source источник
comment
Undefined name 'parse', Undefined name 'get_urls', Undefined name 'task_finished'. Гораздо проще убедиться, что ответы на вопрос верны, если пример кода в вопросе действительно работает :).   -  person Glyph    schedule 04.12.2013


Ответы (1)


Как сейчас написано, мне кажется, что этот код будет иметь ограниченное количество параллельных загрузок, но неограниченное количество параллельных заданий синтаксического анализа. Это намеренно? Я собираюсь предположить «нет», поскольку, если ваша сеть окажется быстрой, а ваш синтаксический анализатор — медленным, поскольку количество URL-адресов приближается к бесконечности, то же самое происходит и с использованием вашей памяти :).

Итак, вот вещь, которая будет иметь ограниченный параллелизм, но вместо этого будет выполнять синтаксический анализ последовательно с загрузками:

from twisted.internet import defer, task
from twisted.web.client import getPage

BATCH_SIZE = 5

def main_task(reactor):
    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parse)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(BATCH_SIZE)])
            .addCallback(task_finished))

task.react(main_task)

Это работает, потому что, поскольку parse (очевидно) возвращает Deferred, добавление его в качестве обратного вызова к тому, который возвращает getPage, приводит к Deferred, который не будет вызывать обратный вызов, добавленный coiterate, пока parse не сделает свое дело.

Поскольку вы спрашивали об идиоматическом Twisted-коде, я также позволил себе немного модернизировать его (используя task.react вместо того, чтобы запускать реактор вручную, встраивая выражения, чтобы сделать вещи короче, и так далее).

Если вы действительно хотите иметь больше параллельных синтаксических анализов, чем параллельных выборок, что-то вроде этого может работать лучше:

from twisted.internet import defer, task
from twisted.web.client import getPage

PARALLEL_FETCHES = 5
PARALLEL_PARSES = 10

def main_task(reactor):
    parseSemaphore = defer.DeferredSemaphore(PARALLEL_PARSES)

    def parseWhenReady(r):
        def parallelParse(_):
            parse(r).addBoth(
                lambda result: parseSemaphore.release().addCallback(
                    lambda _: result
                )
            )
        return parseSemaphore.acquire().addCallback(parallelParse)

    def fetch_urls():
        for url in get_urls():
            yield getPage(url).addCallback(parseWhenReady)

    coop = task.Cooperator()
    urls = fetch_urls()

    return (defer.DeferredList([coop.coiterate(urls)
                               for _ in xrange(PARALLEL_FETCHES)])
            .addCallback(lambda done:
                         defer.DeferredList(
                            [parseSemaphore.acquire()
                             for _ in xrange(PARALLEL_PARSES)]
                         ))
            .addCallback(task_finished))

task.react(main_task)

Вы можете видеть, что parseWhenReady возвращает Deferred, возвращенное из acquire, поэтому параллельная выборка продолжится, как только параллельный анализ сможет начать, и, следовательно, вы не будете продолжать выборку без разбора, даже если анализатор перегружен. Однако parallelParse тщательно воздерживается от возврата Deferred, возвращенного parse или release, поскольку выборка должна продолжаться по мере их выполнения.

(Обратите внимание, что, поскольку ваш первоначальный пример не работал, я вообще не тестировал ни один из них. Надеюсь, цель ясна, даже если есть ошибки.)

person Glyph    schedule 04.12.2013
comment
Я сожалею о том, что код не может быть запущен, и о моей неясности в вопросе. Поскольку вопрос чисто теоретический, меня пока устраивает неограниченный параллелизм для parse. Поскольку main_task будет запускаться с blockingCallFromThread, он должен вернуть Deferred, который вызывается, когда все страницы загружены и все источники проанализированы без использования хакерской переменной состояния. Я отредактировал вопрос, чтобы включить это. - person enderskill; 06.12.2013
comment
Если вы специально хотите снять ограничение на параллелизм parse, просто используйте первый пример, но с yield getPage(url). addCallback(lambda x: parse(x) and None) вместо yield getPage(url).addCallback(parse), чтобы Deferred из parse явно отбрасывалось, а не связывалось. Относится ли этот ответ к вашему варианту использования? - person Glyph; 08.12.2013
comment
Это отвечает на вопрос. Также спасибо за вашу работу над Twisted. Это единственная причина, по которой я использую Python для сетевого программирования. :] - person enderskill; 09.12.2013
comment
+1 за простой пример того, как DeferredSemaphore может быть полезен. Верно ли, что если достаточное количество parse() занимает слишком много времени, то все выборки URL останавливаются, то есть все зависает на parseSemaphore.acquire(), ожидая завершения хотя бы одного parse()? - person jfs; 13.12.2013