Разница между beam.ParDo и beam.Map в типе вывода?

Я использую Apache-Beam для выполнения некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Я заметил одну вещь: разницу в результатах при использовании beam.Map и beam.ParDo

В следующем примере:

Я читаю данные csv и в первом случае передаю их DoFn с помощью beam.ParDo, который извлекает первый элемент, который является датой, а затем распечатывает его. Во втором случае я напрямую использую beam.Map, чтобы сделать то же самое, а затем распечатываю его.

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    )

В двух выходах я заметил следующее:

##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7

##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12

Мне это кажется странным. Интересно, проблема в функции печати? Но после использования разных преобразований он показывает те же результаты. Как пример работает:

| 'Group it 01' >> beam.Map(lambda record: (record, 1))

который по-прежнему возвращает ту же проблему:

##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)

##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)

Есть идеи, в чем причина? Что мне не хватает в разнице между beam.Map и beam.ParDo ???


person Soliman    schedule 24.12.2018    source источник


Ответы (1)


Краткий ответ

Вам нужно обернуть возвращаемое значение ParDo в список.

Расширенная версия

ParDos обычно может возвращать любое количество выходов для одного входа, т.е. для одной входной строки вы можете выдать ноль, один или несколько результатов. По этой причине Beam SDK обрабатывает выходные данные ParDo не как отдельный элемент, а как набор элементов.

В вашем случае ParDo испускает одну строку вместо коллекции. Beam Python SDK все еще пытается интерпретировать вывод этого ParDo, как если бы это был набор элементов. И он делает это, интерпретируя полученную строку как набор символов. Из-за этого ваш ParDo теперь эффективно создает поток отдельных символов, а не поток строк.

Что вам нужно сделать, так это обернуть возвращаемое значение в список:

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return [(str(data_item).split(','))[0]]

Обратите внимание на квадратные скобки. Дополнительные примеры см. В руководстве по программированию.

Map, с другой стороны, можно рассматривать как частный случай ParDo. Ожидается, что Map будет производить ровно один вывод для каждого ввода. Итак, в этом случае вы можете просто вернуть одно значение из лямбда, и оно будет работать, как ожидалось.

И вам, вероятно, не нужно заключать data_item в str. Согласно документам, ReadFromText преобразование производит строки.

person Anton    schedule 27.12.2018