Каков наиболее эффективный способ применить многопроцессорность к уникальным категориям записей в кадре данных pandas?

У меня есть большой набор данных (tsv), который выглядит примерно так:

category     lat         lon  
apple        34.578967   120.232453  
apple        34.234646   120.535667  
pear         32.564566   120.453567  
peach        33.564567   121.456445  
apple        34.656757   120.423566  

Общая цель состоит в том, чтобы передать кадр данных, содержащий все записи для одной категории, в DBScan для создания меток кластера, и сделать это для всех категорий с помощью модуля многопроцессорности. Я могу заставить это работать, но в настоящее время я перезагружаю весь набор данных в каждом процессе, чтобы подмножить категорию, потому что я продолжаю получать ошибки при попытке сослаться на весь набор данных как на глобальную переменную. Код выглядит так:

import pandas as pd
from sklearn.cluster import DBSCAN
import multiprocessing as mp

def findClusters(inCat):
    inTSV = r"C:\trees.csv"
    clDF = pd.read_csv(inTSV, sep='\t')
    catDF = clDF[clDF['category'] == 'inCat']
    kms = 0.05
    scaleDist = 0.01*kms
    x = 'lon'
    y = 'lat'
    dbscan = DBSCAN(eps=scaleDist, min_samples=5)
    clusters = dbscan.fit_predict(catDF[[x,y]])
    catDF['cluster'] = clusters
    catDF.to_csv(r"C:\%s.csv" % (inCat))
    del catDF

if __name__ == "__main__":

    inTSV = r"C:\trees.csv"
    df = pd.read_csv(inTSV, sep='\t')

    catList = list(df.category.unique())

    cores = mp.cpu_count()
    pool = mp.Pool(cores - 1)
    pool.map(findClusters, catList)
    pool.close()
    pool.join()

Я знаю, что это не самый эффективный способ сделать это, поскольку я перечитываю, а также записываю в промежуточные файлы. Я хочу запустить кластеризацию каждой категории данных параллельно. Могу ли я создать список фреймов данных (по одному для каждой категории), которые будут поступать в многопроцессорный пул? Как все это будет перехвачено после обработки (завернуто в вызов concat?). Есть ли лучший способ загрузить данные один раз в память, чтобы каждый процесс мог получить к ним доступ, чтобы выделить нужные ему данные категории, как?

Запуск Anaconda, Python 3.5.5

Спасибо за любое понимание.


person user1229108    schedule 20.08.2018    source источник
comment
кроме того, del catDF совершенно бессмысленно.   -  person juanpa.arrivillaga    schedule 20.08.2018


Ответы (1)


Вы можете использовать df.groupby, поэтому обратите внимание:

In [1]: import pandas as pd

In [2]: df = pd.read_clipboard()

In [3]: df
Out[3]:
  category        lat         lon
0    apple  34.578967  120.232453
1    apple  34.234646  120.535667
2     pear  32.564566  120.453567
3    peach  33.564567  121.456445
4    apple  34.656757  120.423566

In [4]: list(df.groupby('category'))
Out[4]:
[('apple',   category        lat         lon
  0    apple  34.578967  120.232453
  1    apple  34.234646  120.535667
  4    apple  34.656757  120.423566),
 ('peach',   category        lat         lon
  3    peach  33.564567  121.456445),
 ('pear',   category        lat         lon
  2     pear  32.564566  120.453567)]

И просто перепишите свою функцию, чтобы ожидать пару, что-то вроде:

def find_clusters(grouped):
    cat, catDF = grouped
    kms = 0.05
    scale_dist = 0.01*kms
    x = 'lon'
    y = 'lat'
    dbscan = DBSCAN(eps=scale_dist, min_samples=5)
    clusters = dbscan.fit_predict(catDF[[x,y]])
    catDF['cluster'] = clusters
    catDF.to_csv(r"C:\%s.csv" % (cat))

Честно говоря, я думаю, что запись в промежуточные файлы — это нормально.

Если нет, вы всегда можете просто сделать:

return catDF

Вместо

catDF.to_csv(r"C:\%s.csv" % (cat))

А потом:

df = pd.concat(pool.map(findClusters, catList))
person juanpa.arrivillaga    schedule 20.08.2018
comment
Хорошо, пока вы выиграли интернет на сегодня! Сократил время обработки с 5,5 часов до 2 минут для 1,1-миллионного файла записи! Очень ценный урок. - person user1229108; 20.08.2018