Распределение Dask с помощью асинхронного параллелизма в реальном времени

Я читаю документацию по dask.distributed, и похоже, что я могу отправлять функции в распределенный кластер через client.submit().

У меня есть функция some_func, которая асинхронно захватывает отдельные документы (скажем, текстовый файл), и я хочу взять необработанный документ, захватить все слова, не содержащие гласных, и отправить их обратно в другую базу данных. Этот этап обработки данных является блокирующим.

Предполагая, что существует несколько миллионов документов, а распределенный кластер имеет только 10 узлов с 1 доступным процессом (т.е. он может обрабатывать только 10 документов за раз), как dask.distributed будет обрабатывать поток документов, которые ему необходимо обработать?

Вот пример кода:

client = dask.distributed('tcp://1.2.3.4:8786')

def some_func():
    doc = retrieve_next_document_asynchronously() 
    client.submit(get_vowelless_words, doc)

def get_vowelless_words(doc):
    vowelless_words = process(doc)
    write_to_database(vowelless_words)

if __name__ == '__main__':
    for i in range(1000000):
        some_func()

Поскольку обработка документа блокируется, и кластер может обрабатывать только 10 документов одновременно, что произойдет, если 30 других документов будут извлечены, когда кластер занят? Я понимаю, что client.submit() является асинхронным, и он вернет параллельное будущее, но что произойдет в этом случае? Будет ли он удерживать документ в памяти до тех пор, пока не станут доступны 1/10 ядра, и потенциально может вызвать нехватку памяти на машине, скажем, если ожидает 1000 документов.

Что в этом случае сделает планировщик? ФИФО? Должен ли я как-то изменить код, чтобы он ждал, пока будет доступно ядро, прежде чем извлекать следующий документ? Как этого добиться?


person slaw    schedule 04.10.2018    source источник


Ответы (2)


Чтобы использовать очереди с dask, ниже приведен модифицированный пример использования очередей dask с распределенным кластером (на основе документация):

#!/usr/bin/env python

import distributed
from queue import Queue
from threading import Thread

client = distributed.Client('tcp://1.2.3.4:8786')
nprocs = len(client.ncores())

def increment(x):
    return x+1

def double(x):
    return 2*x

input_q = Queue(maxsize=nprocs)
remote_q = client.scatter(input_q)
remote_q.maxsize = nprocs
inc_q = client.map(increment, remote_q)
inc_q.maxsize = nprocs
double_q = client.map(double, inc_q)
double_q.maxsize = nprocs
result_q = client.gather(double_q)

def load_data(q):
    i = 0
    while True:
        q.put(i)
        i += 1

load_thread = Thread(target=load_data, args=(input_q,))
load_thread.start()

while True:
    size = result_q.qsize()
    item = result_q.get()
    print(item, size)

В этом случае мы явно ограничиваем максимальный размер каждой очереди, чтобы он был равен количеству доступных распределенных процессов. В противном случае цикл while перегрузит кластер. Конечно, вы можете настроить maxsize так, чтобы он был кратен количеству доступных процессов. Для простых функций, таких как инкремент и удвоение, я обнаружил, что maxsize = 10*nprocs по-прежнему разумно, но это наверняка будет ограничено количеством времени, которое требуется для запуска вашей пользовательской функции.

person slaw    schedule 06.10.2018

Когда вы вызываете submit, все аргументы сериализуются и немедленно отправляются в планировщик. Альтернативой может быть получение документов и их обработка в кластере (это предполагает, что документы глобально видны всем работникам).

for fn in filenames:
    doc = client.submit(retrieve_doc, fn)
    process = client.submit(process_doc, doc)
    fire_and_forget(process)

Если документы доступны только на вашем клиентском компьютере, и вы хотите ограничить поток, вы можете рассмотреть возможность использования dask Queues или итератора as_completed.

person MRocklin    schedule 05.10.2018
comment
dask,distributed совместим с asyncio.queue? Элементы, которые я помещаю в очередь, представляют собой ответы http, поступающие от aiohttp. Хотя HTTP-запросы являются асинхронными, я думаю, что ответ put на queue.queue является синхронным. С ответами aiohttp, которые возвращают большой файл json, отправка этого ответа dask-worker (через q.put(response['json'])) выполняется медленно и блокирует. Есть ли способ сделать его асинхронным? - person slaw; 14.10.2018
comment
Вы можете запустить dask в асинхронном режиме. Он работает в цикле событий торнадо (который на сегодняшний день также является циклом событий asyncio) См. распределенный.dask.org/en/latest/asynchronous.html - person MRocklin; 15.10.2018