Анализ потока данных фреймов данных Dask

У меня есть набор данных, хранящийся в текстовом файле с разделителями табуляции. Файл выглядит следующим образом:

date    time    temperature
2010-01-01  12:00:00    10.0000 
...

где столбец temperature содержит значения в градусах Цельсия (° C). Я вычисляю среднесуточную температуру с помощью Dask. Вот мой код:

from dask.distributed import Client
import dask.dataframe as dd

client = Client("<scheduler URL")
inputDataFrame = dd.read_table("<input file>").drop('time', axis=1)
groupedData = inputDataFrame.groupby('date')
meanDataframe = groupedData.mean()
result = meanDataframe.compute()
result.to_csv('result.out', sep='\t')

client.close()

Чтобы улучшить производительность моей программы, я хотел бы понять поток данных, вызванный фреймами данных Dask.

  1. Как read_table() считывает текстовый файл во фрейм данных? Читает ли клиент весь текстовый файл и отправляет ли данные планировщику, который разделяет данные и отправляет их рабочим? Или каждый рабочий читает разделы данных, с которыми работает, непосредственно из текстового файла?
  2. Когда создается промежуточный кадр данных (например, путем вызова drop()), отправляется ли весь промежуточный кадр данных обратно клиенту, а затем отправляется рабочим для дальнейшей обработки?
  3. Тот же вопрос для групп: где создаются и хранятся данные для группового объекта? Как это происходит между клиентом, планировщиком и работниками?

Причина моего вопроса заключается в том, что если я запускаю аналогичную программу с использованием Pandas, вычисления выполняются примерно в два раза быстрее, и я пытаюсь понять, что вызывает накладные расходы в Dask. Поскольку размер кадра результирующих данных очень мал по сравнению с размером входных данных, я полагаю, что есть некоторые накладные расходы, вызванные перемещением входных и промежуточных данных между клиентом, планировщиком и рабочими.


person Giorgio    schedule 12.10.2018    source источник


Ответы (1)


1) Данные читают рабочие. Клиент действительно читает немного заранее, чтобы выяснить имена и типы столбцов и, при желании, найти разделители строк для разделения файлов. Обратите внимание, что все рабочие должны иметь возможность достичь интересующего файла (ов), для чего может потребоваться некоторая общая файловая система при работе в кластере.

2), 3) Фактически, методы drop, groupby и mean вообще не генерируют промежуточные фреймы данных, они просто накапливают график операций, которые должны быть выполнены (т.е. они ленивы). Вы можете рассчитать время для этих шагов и убедиться, что они быстрые. Во время выполнения делаются промежуточные копии на рабочих, копии для других рабочих по мере необходимости и выбрасываются как можно скорее. Копии в планировщик или клиент никогда не создаются, если только вы явно этого не запросите.

Итак, к корню вашего вопроса: вы можете лучше всего изучить производительность или свою работу, просмотрев панель управления.

Есть много факторов, которые определяют, как быстро все будет развиваться: процессы могут совместно использовать канал ввода-вывода; некоторые задачи не выпускают GIL и поэтому плохо распараллеливаются в потоках; количество групп сильно повлияет на объем перетасовки данных в группы ... плюс есть всегда некоторые накладные расходы для каждой задачи, выполняемой планировщиком.

Поскольку Pandas эффективен, неудивительно, что в случае, когда данные легко помещаются в память, он работает лучше по сравнению с Dask.

person mdurant    schedule 12.10.2018
comment
Большое спасибо. Я уже использую панель управления, но видео на странице, на которую вы указали в ответе, кажется весьма полезным. Кстати, все рабочие в моей установке работают на виртуальных машинах и получают доступ к общему nfs монтированию. - person Giorgio; 13.10.2018