Python и Trio, где производители являются потребителями, как изящно выйти, когда работа выполнена?

Я пытаюсь создать простой поисковый робот, используя trio и asks. Я использую питомник для одновременного запуска нескольких сканеров и канал памяти для ведения списка URL-адресов для посещения.

Каждый поисковый робот получает клоны обоих концов этого канала, поэтому они могут получить URL-адрес (через Receive_channel), прочитать его, найти и добавить новые URL-адреса для посещения (через send_channel).

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

В этом сценарии потребители являются производителями. Как изящно все остановить?

Условия отключения всего:

  • канал пуст
  • И
  • все поисковые роботы застревают в первом цикле for, ожидая появления URL-адреса в Receive_channel (что... больше не произойдет)

Я пробовал с async with send_channel внутри crawler(), но не смог найти хороший способ сделать это. Я также пытался найти какой-то другой подход (какой-то рабочий пул, привязанный к каналу памяти и т. д.), но и здесь не повезло.


person Paweł Lis    schedule 15.12.2020    source источник


Ответы (2)


Здесь есть как минимум две проблемы.

Во-первых, это ваше предположение об остановке, когда канал пуст. Поскольку вы выделяете канал памяти размером 0, он всегда будет пустым. Вы можете передать URL-адрес только в том случае, если сканер готов его получить.

Это создает проблему номер два. Если вы когда-нибудь найдете больше URL-адресов, чем вы выделили сканерам, ваше приложение заблокируется.

Причина в том, что, поскольку вы не сможете передать все найденные URL-адреса сканеру, сканер никогда не будет готов получить новый URL-адрес для сканирования, потому что он застрял в ожидании, когда другой сканер возьмет один из его URL-адресов.

Это становится еще хуже, потому что, если один из других поисковых роботов найдет новые URL-адреса, они тоже застрянут позади поискового робота, который уже ждет передачи своих URL-адресов, и они никогда не смогут взять один из URL-адресов, ожидающих отправки. обработанный.

Соответствующая часть документации:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

Предполагая, что мы исправим это, что делать дальше?

Вероятно, вам нужно вести список (набор?) всех посещенных URL-адресов, чтобы убедиться, что вы больше их не посещаете.

Чтобы действительно выяснить, когда остановиться, вместо того, чтобы закрывать каналы, вероятно, намного проще просто отменить питомник.

Допустим, мы модифицируем основной цикл следующим образом:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

Теперь нам нужно немного модифицировать краулеры, чтобы забирать токен, когда он активен.

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

Другие вещи, чтобы рассмотреть -

Вы можете использовать send_channel.send_noblock(u) в сканере. Поскольку у вас есть неограниченный буфер, вероятность возникновения исключения WillBlock исключена, и поведение, при котором триггер контрольной точки не срабатывает при каждой отправке, может оказаться желательным. Таким образом, вы точно знаете, что конкретный URL-адрес полностью обработан и все новые URL-адреса были добавлены, прежде чем другие задачи получат возможность получить новый URL-адрес, или родительская задача получит возможность проверить, выполнена ли работа.

person Anders E. Andersen    schedule 15.12.2020
comment
Извините, это я по памяти написал. Изменено на math.inf. Но помимо неправильного размера канала, как изящно закрыть все, когда больше нечего делать? Или я просто пытаюсь использовать неправильный шаблон здесь? - person Paweł Lis; 15.12.2020
comment
@PawełLis Я добавил возможное решение. Вероятно, есть много способов реализовать что-то подобное. - person Anders E. Andersen; 15.12.2020

Это решение, которое я придумал, когда пытался реорганизовать проблему:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

(я пропускал посещенные URL-адреса)

Здесь нет 3-х долгоживущих потребителей, но есть не более 3-х потребителей, когда для них достаточно работы.

В #HERE1 send_channel закрыт (поскольку он использовался в качестве диспетчера контекста), единственное, что поддерживает канал, — это его клон внутри этого канала.

В #HERE2 клон тоже закрыт (потому что менеджер контекста). Если канал пуст, то этот клон был последним, что поддерживало существование канала. Канал умирает, для концов цикла (#HERE3).

ЕСЛИ не были найдены URL-адреса, и в этом случае они были добавлены в канал вместе с дополнительными клонами send_channel, которые будут поддерживать работу канала достаточно долго для обработки этих URL-адресов.

И это, и решения Андерса Э. Андерсена кажутся мне хакерскими: одно использует sleep и statistics(), другое создает клоны send_channel и помещает их в канал... мне кажется, что это программная реализация бутылки Клейна. Я, вероятно, буду искать какие-то другие подходы.

person Paweł Lis    schedule 16.12.2020
comment
Хм, в #HERE1 канал отправки закрыт, поэтому петля приема пропускает закрытый канал? - person Matthias Urlichs; 16.12.2020
comment
канал действительно закрыт, когда закрыты все send_channels. В #HERE1 исходный send_channel закрывается, но одной строкой выше #HERE1 мы создаем клон, который не будет закрыт. - person Paweł Lis; 16.12.2020
comment
Глядя на это прямо сейчас, мне, вероятно, не нужно закрывать оригинал и создавать клон (две строки выше #HERE1), я мог бы просто поместить оригинал в канал и позволить первому сканеру закрыть его. #ЗДЕСЬ2 - person Paweł Lis; 16.12.2020