Сумка Dask из нескольких файлов в фрейм данных Dask со столбцами

Мне дан список имен файлов files, которые содержат данные с разделителями-запятыми, которые необходимо очистить, а также расширить за счет столбцов, содержащих информацию, основанную на именах файлов. Таким образом, я реализовал небольшую функцию read_file, которая занимается как начальной очисткой, так и вычислением дополнительных столбцов. Используя db.from_sequence(files).map(read_file), я сопоставляю функцию чтения со всеми файлами, получая список словарей для каждого.

Однако вместо списка словарей я хочу, чтобы моя сумка содержала каждую отдельную строку входных файлов в виде записи. Впоследствии я хочу сопоставить ключи словарей с именами столбцов в кадре данных dask.

from dask import bag as db

def read_file(filename):
    ret = []
    with open(filename, 'r') as fp:
        ... # reading line of file and storing result in dict
        ret.append({'a': val_a, 'b': val_b, 'c': val_c})

    return ret

from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])

Может ли кто-нибудь сообщить мне, что мне нужно изменить, чтобы этот код заработал? Существуют ли другие подходы, которые были бы более подходящими?

Редактировать: я создал три тестовых файла a_20160101.txt, a_20160102.txt, a_20160103.txt. Все они содержат всего несколько строк с одной строкой в ​​каждой.

asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads 
f

Раньше у меня была небольшая ошибка в read_file, но теперь вызов my_bag.take(10) после отображения на ридер работает нормально:

([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b':    datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)

Однако my_df = my_bag.to_dataframe(columns=['a', 'b', 'c']), а затем my_df.head(10) по-прежнему поднимают dask.async.AssertionError: 3 columns passed, passed data had 10 columns


person sim    schedule 22.08.2016    source источник
comment
Что плохого в том, чтобы иметь пакет словарей и затем переходить к to_dataframe?   -  person MRocklin    schedule 22.08.2016


Ответы (1)


Возможно, вам нужно позвонить flatten

Ваш набор имен файлов выглядит так:

['a.txt', 
 'b.txt', 
 'c.txt']

После вызова map ваша сумка выглядит так:

[[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}],
 [{'a': 1, 'b': 2, 'c': 3}],
 [{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]]

Каждый файл был превращен в список диктовок. Теперь ваша сумка похожа на список списков диктовок.

Метод .to_dataframe хочет, чтобы у вас был список диктов. Итак, давайте объединим нашу сумку в единую сплющенную коллекцию диктов.

my_bag = db.from_sequence(files).map(read_file).flatten()

[{'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30},
 {'a': 1, 'b': 2, 'c': 3},
 {'a': 1, 'b': 2, 'c': 3}, {'a': 10, 'b': 20, 'c': 30}]
person MRocklin    schedule 22.08.2016
comment
с concat() работает отлично, спасибо! Было бы лучше использовать другой подход, или сглаживание нормально? Если я правильно помню dask-tutorial, пакеты должны подойти для приема/преобразования, не так ли? верный? - person sim; 22.08.2016
comment
Мне это кажется разумным подходом. Вы также можете попробовать dask.delayed. См. примечания по работе с коллекциями. - person MRocklin; 22.08.2016
comment
Просто предупреждаю, что .concat() у меня не сработало, а .flatten() сработало в dask-2.14.0-py_0. b = bag.from_sequence(файлы).map(read_file).flatten() - person Amanda; 08.04.2020