Огромная разница в использовании памяти между dask и dask.distributed

Я пытаюсь использовать dask.delayed для вычисления большой матрицы для использования в последующих вычислениях. Я всегда запускаю код только на одном локальном компьютере. Когда я использую dask планировщик для одной машины, он работает нормально, но немного медленнее. Чтобы получить доступ к дополнительным параметрам и мониторам производительности для улучшения кода, я хотел бы использовать dask.distributed на одной машине. Однако выполнение того же кода с dask.distributed клиентом медленно съедает всю доступную память и дает сбой, ничего не достигая.

Есть ли другой способ решения проблемы, который позволил бы клиенту dask.distributed работать с большей эффективностью памяти?

  • Я прочитал руководство по передовой практике dask.delayed и думаем, что используем его правильно.
  • Я запускал его на локальном ПК с Win 10 (64 ГБ ОЗУ) и виртуальной машине Azure Win Server 2012 (256 ГБ) с тем же результатом.
  • Я пробовал устанавливать чанки вручную.
  • Я пробовал использовать stack.rechunk для оптимизации размеров фрагментов, включая автоматическое разбиение по строкам и столбцам (фрагменты строк, кажется, выполняются намного быстрее в планировщике dask).
  • Я пробовал использовать compute() и persist() (результат тот же).
  • Я пробовал запустить клиент dask.distributed с планировщиком потоков и процессов и настроить количество рабочих. threads перед смертью использует еще больше оперативной памяти.
  • Я попытался настроить dask.distributed с ограничением памяти cluster = distributed.LocalCluster(memory_limit = 8e9) в соответствии с этим ответом, но ограничение памяти игнорируется.
  • Если я уменьшу размер проблемы (nX и nY ниже), dask.distributed клиент выполнит задачу, однако он по-прежнему требует гораздо больше времени и памяти, чем dask планировщик.

В этом примере воспроизводится проблема:

import dask
import distributed
import numpy as np
import dask.array as da

def calcRow(X,Y):
    Tx = np.transpose(X * (X + Y)) # Simplified work
    return (Tx)

# Specify size of (nY x nX) matrix
nX = 1000000 #  Distributed fails with nX >= 1000000 and nY >= 5000
nY = 5000

# Fill with random data
x = np.random.rand(nX,1)
y = np.random.rand(nY,1)

# Setup dask.distributed client.
# Comment out these two lines to use the standard dask scheduler,
# which does work 
client = distributed.Client()
client

# Build the matrix
row = dask.delayed(calcRow, pure=True)   # Build 1 row
makeRows = [row(x, y[ii]) for ii in range(nY)] # Loop for all nY rows
buildMat = [da.from_delayed(makeRow, dtype=float, shape=(10,nX))
            for makeRow in makeRows] # Build matrix
stack = da.vstack(buildMat)
my_matrix = stack.compute() # Calculate the matrix entries

На самом деле мои проблемы намного больше, и calcRow сам по себе является большим, медленным и сложным вычислением, но этапы построения формы и матрицы такие же.

Я понимаю, что лучше всего scatter данные в память перед вызовом compute, но у меня нет функции для разброса, только массив delayed.

Если я закомментирую 2 dask.distributed клиентские строки, приведенный выше пример выполняется за 60 секунд, используя максимум 0,25 ГБ ОЗУ. Но с этими строками код достигает полного использования памяти (64 ГБ) за 3-4 минуты и продолжает работать, пока система не станет нестабильной.

Если я построю матрицу в dask, я смогу запустить dask.distributed клиента и без проблем использовать матрицу в последующих dask.distributed вычислениях. Это просто построение матрицы, которая вызывает проблемы.

Я почти чувствую, что это ошибка, но не уверен, что мой код виноват. Я бы очень ценил предложения, которые могут помочь запустить код или доказать ошибку.

РЕДАКТИРОВАТЬ 1: Я также пробовал применить декоратор к calcRow:

@dask.delayed
def calcRow(X,Y):

и используя:

makeRows = [calcRow(x, y[ii]) for ii in range(nY)]

а что вроде идентично?

РЕДАКТИРОВАТЬ 2: если я запускаю distributed.client с processes=False, он быстрее использует всю системную память, но на самом деле выдает следующее предупреждение, которое может быть диагностическим:

распределенный.worker - ПРЕДУПРЕЖДЕНИЕ - использование памяти велико, но у worker нет данных для сохранения на диск. Возможно, утечка памяти происходит из-за какого-то другого процесса? Память процесса: 40,27 ГБ - ограничение рабочей памяти: 8,00 ГБ


person Nick W.    schedule 26.06.2019    source источник
comment
Для чего вы используете свою матрицу? Если это умножение матрицы на вектор или матрица-матрица, вы можете отложить вычисление до этого момента, поскольку вам никогда не понадобится иметь полную матрицу в памяти.   -  person Juan Carlos Ramirez    schedule 27.06.2019
comment
Это определенно вариант для некоторых проблем @JuanCarlosRamirez. Мы можем позвонить results = da.dot(stack, x).compute() и все готово. Но из-за размера большинства проблем и сложности последующих этапов вычислений (инверсия матрицы) мы сохраняем матрицу на диск с помощью zarr G = da.to_zarr(stack, path_to_zarr) либо для использования в последующих вычислениях в этом прогоне, либо для последующих прогонов. Кажется, что этот вызов запускает неявный compute() вызов, который приводит к той же проблеме, что описана выше.   -  person Nick W.    schedule 27.06.2019