Как запустить кластер dask.distributed в одном потоке?

Как запустить полный кластер Dask.distributed в одном потоке? Я хочу использовать это для отладки или профилирования.

Примечание. Это часто задаваемый вопрос. Я добавляю вопрос и ответ здесь в Stack Overflow только для повторного использования в будущем.


person MRocklin    schedule 26.05.2017    source источник


Ответы (1)


Локальный планировщик

Если вы можете обойтись API-интерфейсом одномашинного планировщика (просто вычислите), вы можете использовать однопоточный планировщик.

x.compute(scheduler='single-threaded')

Распределенный планировщик — одна машина

Если вы хотите запустить кластер dask.distributed на одной машине, вы можете запустить клиент без аргументов.

from dask.distributed import Client
client = Client()  # Starts local cluster
x.compute()

Это использует много потоков, но работает на одной машине

Распределенный планировщик — один процесс

В качестве альтернативы, если вы хотите запустить все в одном процессе, вы можете использовать ключевое слово processes=False

from dask.distributed import Client
client = Client(processes=False)  # Starts local cluster
x.compute()

Вся связь и управление происходят в одном потоке, хотя вычисления происходят в отдельном пуле потоков.

Распределенный планировщик — один поток

Чтобы запустить управление, связь и вычисления в одном потоке, вам нужно создать Tornado concurrent.futures Executor. Осторожно, этот Tornado API может быть закрытым.

from dask.distributed import Scheduler, Worker, Client
from tornado.concurrent import DummyExecutor
from tornado.ioloop import IOLoop
import threading

loop = IOLoop()
e = DummyExecutor()
s = Scheduler(loop=loop)
s.start()
w = Worker(s.address, loop=loop, executor=e)
loop.add_callback(w._start)

async def f():
    async with Client(s.address, start=False) as c:
        future = c.submit(threading.get_ident)
        result = await future
        return result

>>> threading.get_ident() == loop.run_sync(f)
True
person MRocklin    schedule 26.05.2017
comment
Вероятно, невежественный вопрос, но есть ли способ сделать последний подход на основе DummyExecutor синхронным способом? - person bnaul; 07.09.2018
comment
Нет, если вы хотите, чтобы был активен только один поток. Чтобы поток, в котором вы вызываете отправку, ожидание материала и т. д., был активен в то же время, когда потоки выполняют работу, вам необходимо задействовать стили асинхронного программирования. - person MRocklin; 07.09.2018
comment
В целях отладки у меня есть фиктивный класс, который реализует основные примитивы (submit, map и т. д.) и блоки вычислений; поэтому submit оценивает функцию и возвращает будущее со статусом 'finished', но сразу возникают ошибки. Может быть, это настолько отличается, что не вписывается в обычную структуру Client, но приятно иметь полную трассировку стека и иметь возможность % отладки в IPython. - person bnaul; 08.09.2018