Средство чтения файлов CSV Flink не может преобразовать LongType в PojoType

Часть кода, который я пытаюсь выполнить во Flink:

val pages = env.readCsvFile[(Long)]("/home/ppi.csv",
   fieldDelimiter = "\t", includedFields = Array(1))

Я хочу использовать pages для какой-то другой цели, но когда я компилирую, Flink выдает мне сообщение об ошибке, как

Исключение в потоке "main" java.lang.ClassCastException:
org.apache.flink.api.common.typeinfo.IntegerTypeInfo нельзя преобразовать в
org.apache.flink.api.java.typeutils.PojoTypeInfo

Кстати, я использую моментальную версию Flink 0.9. Любая помощь в правильном направлении высоко ценится.


person Vinothkumar Mohanakrishnan    schedule 08.07.2015    source источник
comment
может быть, имеет больше смысла размещать сообщения на форумах flink/списке рассылки/системе отслеживания ошибок, тем более, что это версия моментального снимка. вы пробовали с последним релизом?   -  person hoijui    schedule 08.07.2015
comment
Можете ли вы опубликовать полную трассировку стека?   -  person Robert Metzger    schedule 08.07.2015


Ответы (1)


Если вы читаете из CSV-файла, возвращаемый тип будет кортежем Scala со всеми прочитанными полями. В вашем примере вы читаете только одно поле, которое даст Tuple1. Это то, что вы пытаетесь указать в скобках, окружающих «Длинный»:

readCsvFile[(Long)]

В Scala вы можете указывать кортежи только с двумя или более полями, используя круглые скобки. Поэтому вам нужно написать вместо

readCsvFile[Tuple1[Long]]

Исключение возникает, потому что CSVInputFormat Flink пытается интерпретировать все типы, отличные от Tuple, как типы Pojo.

person Fabian Hueske    schedule 08.07.2015
comment
Привет Фабиан. Спасибо за ваш ответ. мой вопрос: ниже приведен код, который я нашел в руководстве по программированию Flink, там они считывают данные из CSV-файла. //чтение CSV-файла с пятью полями, взяв только два из них val csvInput = env.readCsvFile[(String, Double)]( hdfs:///the/CSV/file, includeFields = Array(0, 3)) / / возьмем первое и четвертое поле. Является ли csvinput кортежем или Pojo? - person Vinothkumar Mohanakrishnan; 08.07.2015
comment
Тип возвращаемого значения в этом примере — Tuple2[String, Double]. Scala предлагает ярлык для указания типов кортежей как (String, Double). Однако это работает только для кортежей с двумя или более полями. - person Fabian Hueske; 08.07.2015
comment
Привет Фабиан. Еще раз спасибо. Есть ли способ преобразовать Tuple1 [Long] в Long в Scala? Потому что я получаю сообщение об ошибке Ожидаемое несоответствие типа: длинное, но фактическое: найден Tuple1[Long]. когда я пытаюсь использовать карту на входе, считанном из файла CSV? Ниже приведен мой фрагмент val pages = env.readCsvFile[Tuple1[(Long)]](/home/vinoth/bigdata/assign10/ppi.csv, fieldDelimiter = \t, includeFields = Array(1)) val pagesWithRanks = pages.map (p(вот ошибка, которую я получаю) => Page(p, 1.0 / pages.count())). Спасибо. - person Vinothkumar Mohanakrishnan; 08.07.2015
comment
Вы можете реализовать функцию карты так, чтобы она принимала Tuple1[Long] как: val pagesWithRanks = pages.map(p => Page(p._1, 1.0 / pages.count()))) - person Fabian Hueske; 08.07.2015
comment
Кстати, вы не можете использовать pages.count() непосредственно в функции карты. Вы должны сначала собрать это в значение, подобное этому: val cnt = pages.count(), а затем использовать cnt в функции карты. - person Fabian Hueske; 08.07.2015