Я пытаюсь использовать dask.delayed
для вычисления большой матрицы для использования в последующих вычислениях. Я всегда запускаю код только на одном локальном компьютере. Когда я использую dask
планировщик для одной машины, он работает нормально, но немного медленнее. Чтобы получить доступ к дополнительным параметрам и мониторам производительности для улучшения кода, я хотел бы использовать dask.distributed
на одной машине. Однако выполнение того же кода с dask.distributed
клиентом медленно съедает всю доступную память и дает сбой, ничего не достигая.
Есть ли другой способ решения проблемы, который позволил бы клиенту dask.distributed
работать с большей эффективностью памяти?
- Я прочитал руководство по передовой практике dask.delayed и думаем, что используем его правильно.
- Я запускал его на локальном ПК с Win 10 (64 ГБ ОЗУ) и виртуальной машине Azure Win Server 2012 (256 ГБ) с тем же результатом.
- Я пробовал устанавливать чанки вручную.
- Я пробовал использовать
stack.rechunk
для оптимизации размеров фрагментов, включая автоматическое разбиение по строкам и столбцам (фрагменты строк, кажется, выполняются намного быстрее в планировщикеdask
). - Я пробовал использовать
compute()
иpersist()
(результат тот же). - Я пробовал запустить клиент
dask.distributed
с планировщиком потоков и процессов и настроить количество рабочих.threads
перед смертью использует еще больше оперативной памяти. - Я попытался настроить
dask.distributed
с ограничением памятиcluster = distributed.LocalCluster(memory_limit = 8e9)
в соответствии с этим ответом, но ограничение памяти игнорируется. - Если я уменьшу размер проблемы (
nX
иnY
ниже),dask.distributed
клиент выполнит задачу, однако он по-прежнему требует гораздо больше времени и памяти, чемdask
планировщик.
В этом примере воспроизводится проблема:
import dask
import distributed
import numpy as np
import dask.array as da
def calcRow(X,Y):
Tx = np.transpose(X * (X + Y)) # Simplified work
return (Tx)
# Specify size of (nY x nX) matrix
nX = 1000000 # Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000
# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)
# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work
client = distributed.Client()
client
# Build the matrix
row = dask.delayed(calcRow, pure=True) # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries
На самом деле мои проблемы намного больше, и calcRow
сам по себе является большим, медленным и сложным вычислением, но этапы построения формы и матрицы такие же.
Я понимаю, что лучше всего scatter
данные в память перед вызовом compute
, но у меня нет функции для разброса, только массив delayed
.
Если я закомментирую 2 dask.distributed
клиентские строки, приведенный выше пример выполняется за 60 секунд, используя максимум 0,25 ГБ ОЗУ. Но с этими строками код достигает полного использования памяти (64 ГБ) за 3-4 минуты и продолжает работать, пока система не станет нестабильной.
Если я построю матрицу в dask
, я смогу запустить dask.distributed
клиента и без проблем использовать матрицу в последующих dask.distributed
вычислениях. Это просто построение матрицы, которая вызывает проблемы.
Я почти чувствую, что это ошибка, но не уверен, что мой код виноват. Я бы очень ценил предложения, которые могут помочь запустить код или доказать ошибку.
РЕДАКТИРОВАТЬ 1: Я также пробовал применить декоратор к calcRow
:
@dask.delayed
def calcRow(X,Y):
и используя:
makeRows = [calcRow(x, y[ii]) for ii in range(nY)]
а что вроде идентично?
РЕДАКТИРОВАТЬ 2: если я запускаю distributed.client
с processes=False
, он быстрее использует всю системную память, но на самом деле выдает следующее предупреждение, которое может быть диагностическим:
распределенный.worker - ПРЕДУПРЕЖДЕНИЕ - использование памяти велико, но у worker нет данных для сохранения на диск. Возможно, утечка памяти происходит из-за какого-то другого процесса? Память процесса: 40,27 ГБ - ограничение рабочей памяти: 8,00 ГБ
results = da.dot(stack, x).compute()
и все готово. Но из-за размера большинства проблем и сложности последующих этапов вычислений (инверсия матрицы) мы сохраняем матрицу на диск с помощью zarrG = da.to_zarr(stack, path_to_zarr)
либо для использования в последующих вычислениях в этом прогоне, либо для последующих прогонов. Кажется, что этот вызов запускает неявныйcompute()
вызов, который приводит к той же проблеме, что описана выше. - person Nick W.   schedule 27.06.2019