Мне дан список имен файлов files
, которые содержат данные с разделителями-запятыми, которые необходимо очистить, а также расширить за счет столбцов, содержащих информацию, основанную на именах файлов. Таким образом, я реализовал небольшую функцию read_file
, которая занимается как начальной очисткой, так и вычислением дополнительных столбцов. Используя db.from_sequence(files).map(read_file)
, я сопоставляю функцию чтения со всеми файлами, получая список словарей для каждого.
Однако вместо списка словарей я хочу, чтобы моя сумка содержала каждую отдельную строку входных файлов в виде записи. Впоследствии я хочу сопоставить ключи словарей с именами столбцов в кадре данных dask.
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
Может ли кто-нибудь сообщить мне, что мне нужно изменить, чтобы этот код заработал? Существуют ли другие подходы, которые были бы более подходящими?
Редактировать: я создал три тестовых файла a_20160101.txt
, a_20160102.txt
, a_20160103.txt
. Все они содержат всего несколько строк с одной строкой в каждой.
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
Раньше у меня была небольшая ошибка в read_file
, но теперь вызов my_bag.take(10)
после отображения на ридер работает нормально:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
Однако my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
, а затем my_df.head(10)
по-прежнему поднимают dask.async.AssertionError: 3 columns passed, passed data had 10 columns