Я читаю документацию по dask.distributed
, и похоже, что я могу отправлять функции в распределенный кластер через client.submit()
.
У меня есть функция some_func
, которая асинхронно захватывает отдельные документы (скажем, текстовый файл), и я хочу взять необработанный документ, захватить все слова, не содержащие гласных, и отправить их обратно в другую базу данных. Этот этап обработки данных является блокирующим.
Предполагая, что существует несколько миллионов документов, а распределенный кластер имеет только 10 узлов с 1 доступным процессом (т.е. он может обрабатывать только 10 документов за раз), как dask.distributed будет обрабатывать поток документов, которые ему необходимо обработать?
Вот пример кода:
client = dask.distributed('tcp://1.2.3.4:8786')
def some_func():
doc = retrieve_next_document_asynchronously()
client.submit(get_vowelless_words, doc)
def get_vowelless_words(doc):
vowelless_words = process(doc)
write_to_database(vowelless_words)
if __name__ == '__main__':
for i in range(1000000):
some_func()
Поскольку обработка документа блокируется, и кластер может обрабатывать только 10 документов одновременно, что произойдет, если 30 других документов будут извлечены, когда кластер занят? Я понимаю, что client.submit()
является асинхронным, и он вернет параллельное будущее, но что произойдет в этом случае? Будет ли он удерживать документ в памяти до тех пор, пока не станут доступны 1/10 ядра, и потенциально может вызвать нехватку памяти на машине, скажем, если ожидает 1000 документов.
Что в этом случае сделает планировщик? ФИФО? Должен ли я как-то изменить код, чтобы он ждал, пока будет доступно ядро, прежде чем извлекать следующий документ? Как этого добиться?