читать процесс и объединять фреймворк pandas параллельно с dask

Я пытаюсь прочитать и обработать параллельно список файлов csv и объединить вывод в один pandas dataframe для дальнейшей обработки.

Мой рабочий процесс состоит из 3 шагов:

  • создать серию фреймов данных pandas, прочитав список файлов csv (все с одинаковой структурой)

    def loadcsv(filename): df = pd.read_csv(filename) return df

  • для каждого фрейма данных создайте новый столбец, обработав 2 существующих столбца

    def makegeom(a,b): return 'Point(%s %s)' % (a,b)

    def applygeom(df): df['Geom']= df.apply(lambda row: makegeom(row['Easting'], row['Northing']), axis=1) return df

  • объединить все фреймы данных в один фрейм данных

    frames = [] for i in csvtest: df = applygeom(loadcsv(i)) frames.append(df) mergedresult1 = pd.concat(frames)

В своем рабочем процессе я использую pandas (каждый файл csv (15) имеет более >> 2 * 10 ^ 6 точек данных), поэтому для завершения требуется время. Я думаю, что этот вид рабочего процесса должен использовать преимущества некоторой параллельной обработки (по крайней мере, для шагов read_csv и apply), поэтому я попытался выполнить dask, но не смог использовать его должным образом. В своей попытке я не добился улучшения скорости.

Я сделал простой блокнот, чтобы воспроизвести то, что я делаю:

https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50

Мой вопрос ... каков правильный способ использования dask для выполнения моего варианта использования?


person epifanio    schedule 04.11.2016    source источник


Ответы (2)


Панды

В Pandas я бы использовал метод apply

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})

In [3]: def makegeom(row):
   ...:      a, b = row
   ...:      return 'Point(%s %s)' % (a, b)
   ...: 

In [4]: df.apply(makegeom, axis=1)
Out[4]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)
dtype: object

Dask.dataframe

В dask.dataframe вы можете сделать то же самое

In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=2)

In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)

Добавить новую серию

В любом случае вы можете добавить новую серию в фрейм данных.

df['geom'] = df[['a', 'b']].apply(makegeom)

Создавать

Если у вас есть данные CSV, я бы использовал функцию dask.dataframe.read_csv

ddf = dd.read_csv('filenames.*.csv')

Если у вас есть другие типы данных, я бы использовал dask.delayed

person MRocklin    schedule 04.11.2016
comment
Ваша функция makegeom связана с GIL. Вам следует прочитать dask.readthedocs.io/en/latest/scheduler-choice.html, чтобы узнать, как выбрать хороший планировщик для вашей ситуации. - person MRocklin; 04.11.2016
comment
Я все еще работаю над своей проблемой. Я изменил функцию makegeom, заменив apply пользовательским кодом numpy (который работает намного быстрее). Сейчас я работаю над новым блокнотом. Мой план состоит в том, чтобы сначала немного узнать об очередях и «общих объектах» между процессами, а затем понять, как использовать dask с distribute. - person epifanio; 08.11.2016

Тем временем я нашел другие способы (альтернативные Dask), на мой взгляд, относительно более простые, для параллельного выполнения функции func над фреймом данных pandas. В обоих случаях я воспользовался преимуществом метода numpy.array_split.

Один использует комбинацию python multiprocessing.Pool, numpy.array_split и _ 5_ и будет работать следующим образом:

import numpy as np

def func(array):
    # do some computation on the given array
    pass

def parallelize_dataframe(df, func, n_cores=72):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

Другой - использование мощного, но простого кластера ray (что весьма полезно, если вы может запускать код на нескольких машинах):

# connect to a ray cluster
# 

import ray

ray.init(address="auto", redis_password="5241590000000000")

import numpy as np


@ray.remote
def func(df):
    # do some computation on the given dataframe
    pass

df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))

Вышеупомянутые методы довольно хорошо работают для простых методов func, где вычисления могут выполняться с помощью numpy, а возвращаемый продукт может быть объединен обратно во фрейм данных pandas - для методов, которые выполняют более простые манипуляции с файлами, я также нашел полезными parmap.map, но это не по теме для этого ТАК вопрос.

person epifanio    schedule 18.09.2020