Как мне заставить dask вычислить список отложенных результатов или результатов на основе dask-контейнера?

У меня есть тривиально распараллеливаемая задача независимого вычисления результатов для многих таблиц, разбитых на множество файлов. Я могу создавать списки отложенных или dask.dataframe (а также пробовал, например, dict), и я не могу получить все результаты для вычисления (я могу получить отдельные результаты из словаря стиля графа dask, используя .get(), но опять же могу ' t легко вычислить все результаты). Вот минимальный пример:

>>> df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)
>>> numbers = [df['a'].mean() for _ in range(2)]
>>> dd.compute(numbers)
([<dask.dataframe.core.Scalar at 0x7f91d1523978>,
  <dask.dataframe.core.Scalar at 0x7f91d1523a58>],)

Сходным образом:

>>> from dask import delayed
>>> @delayed
... def mean(data):
...     sum(data) / len(data)
>>> delayed_numbers = [mean([1,2]) for _ in range(2)]
>>> dask.compute(delayed_numbers)
([Delayed('mean-0e0a0dea-fa92-470d-b06e-b639fbaacae3'),
  Delayed('mean-89f2e361-03b6-4279-bef7-572ceac76324')],)

Я хотел бы получить [3, 3], чего я и ожидал на основе отложенные коллекции документов.

Для моей реальной проблемы я действительно хотел бы вычислить по таблицам в файле HDF5, но, учитывая, что я могу заставить это работать с dask.get(), я почти уверен, что уже правильно указываю свой шаг отложенного / dask фрейма данных.

Мне было бы интересно решение, которое напрямую приводит к созданию словаря, но я также могу просто вернуть список кортежей (ключ, значение) в dict(), что, вероятно, не сильно снижает производительность.


person Dav Clark    schedule 24.05.2016    source источник
comment
Документы, исправленные в github.com/dask/dask/commit/   -  person MRocklin    schedule 24.05.2016


Ответы (1)


Compute принимает множество коллекций в качестве отдельных аргументов. Попробуйте изложить свои аргументы следующим образом:

In [1]: import dask.dataframe as dd

In [2]: import pandas as pd

In [3]: df = dd.from_pandas(pd.DataFrame({'a': [1,2]}), npartitions=1)

In [4]: numbers = [df['a'].mean() for _ in range(2)]

In [5]: dd.compute(*numbers)  # note the *
Out[5]: (1.5, 1.5)

Или, что может быть более распространено:

In [6]: dd.compute(df.a.mean(), df.a.std())
Out[6]: (1.5, 0.707107)
person MRocklin    schedule 24.05.2016
comment
Хорошо, это работает, за исключением того, что для моего полномасштабного примера он довольно медленный (и IO, и CPU сильно недоиспользуются, и я вижу только один поток ... и dask.multiprocessing.get выдает некоторые исключения). Но это уже отдельный вопрос. - person Dav Clark; 24.05.2016
comment
многопроцессорность должна будет перемещать данные между процессами. Я рекомендую использовать threadaded.get (должен быть по умолчанию) или использовать распределенный планировщик на одной машине (что позволяет лучше избегать перемещения данных) http; // Распределенный.readthedocs.org - person MRocklin; 24.05.2016
comment
есть ли способ вызвать visualize () в списке аргументов с разделенными знаками, или это можно сделать только для каждого элемента в списке? - person user4446237; 28.12.2020