Как использовать joda.time в flink (или как использовать typeutils.runtime.kryo)

В проекте flink я использую щелчок класса case.

case class click( date: LocalDateTime, stbId:String, channelId :Int)

Этот класс заполнил наборы данных и отлично работал с датой java 8 java.time.LocalDateTime. После переключения на org.joda (версия 2.9) в среде java 7 вызовы объектов щелчка в наборах данных не выполнялись, как раньше. Доступ к определенным функциям поля даты клика Объект бросил NullPointerExceptions. Примером этих функций являются getHourOfDay toString и т. д. Я смог убедиться, что поле даты класса щелчка не равно нулю. Я подозреваю, что библиотека времени joda плохо взаимодействует с сериализацией крио. См. формат joda DateTime вызывает ошибку нулевого указателя в искре Функции RDD или NPE в искре с Joda DateTime В Flink API есть org.apache.flink.api.java.typeutils.runtime.kryo.Serializers со статическим методом registerJodaTime. Это кажется актуальным. Я простодушно пытался

import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)

Этого было недостаточно. Прав ли я в этом? Как использовать java.typeutils.runtime.kryo?

Используемая версия Flink: 0.9.1. скала: 2.10 joda.time 2.9

Дополнение: Вот точный добавленный код, как было предложено (спасибо Фабиану и Роберту).

val env = ExecutionEnvironment.getExecutionEnvironment
//import  org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)

В файлах журнала встроенного исполнения я смог найти следующие важные части:

16:44:53,998 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE
16:44:54,545 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream                                        - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor             - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO  org.apache.flink.api.java.typeutils.TypeExtractor                 - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$                        - accessedFields: Map()
16:44:57,369 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Static code analysis mode: DISABLE

Тем не менее я стал свидетелем следующего

Exception in thread "main" java.lang.NullPointerException
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
    at java.lang.String.valueOf(Unknown Source)
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
    at myflink.click.toString(Ingestor.scala:20)
    ...

person Spyros Komninos    schedule 11.11.2015    source источник
comment
Привет, эта строка Serializers.registerJodaTime(new ExecutionConfig) правильный вызов метода, но она не имеет никаких эффектов, потому что это не конфигурация выполнения из ExecutionEnvironment.   -  person Robert Metzger    schedule 12.11.2015


Ответы (2)


Flink использует Kryo для типов, которые он не может сериализовать. LocalDateTime является таким классом.

К сожалению, Kryo также не может правильно сериализовать его, поэтому мы должны сообщить Kryo, как это сделать, предоставив ему специализированный сериализатор для этого класса.

  1. Добавьте de.javakaffee:kryo-serializers в качестве зависимости:
<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.30</version>
</dependency>

(обратите внимание, что добавление этой зависимости может вызвать проблемы с использованием Flink в кластере. Пожалуйста, дайте мне знать)

  1. Зарегистрируйте новый сериализатор с помощью ExecutionEnvironment:
val env = ExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer])

Я надеюсь, что это поможет (я сохраняю старый ответ в качестве ссылки)


Некоторые общие замечания по устранению проблем с Kryo/Serializer во Flink:

При локальном выполнении задания (также должно работать во внешнем интерфейсе ./bin/flink, но тогда вывод, вероятно, находится в каталоге log/), вы должны увидеть что-то вроде:

14:05:52,863 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 15 registered types and 2 default Kryo serializers 
14:05:52,943 INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster         - Starting FlinkMiniCluster. 
14:05:53,150 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started

С количеством зарегистрированных типов и сериализаторов Kryo выше 0.

С уровнем журнала DEBUG (замените INFO на DEBUG в log4j.properties) вы можете получить еще более подробную информацию о зарегистрированных сериализаторах:

14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializers types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo with Serializer Classes types: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers: 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered Kryo default Serializers Classes 
14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment                - Registered POJO types: 
person Robert Metzger    schedule 12.11.2015
comment
Я добавил соответствующий вывод журнала. Нужно читать после Follow Up. - person Spyros Komninos; 15.11.2015
comment
Можете ли вы поделиться полным исходным кодом задания, которое вы выполняете? Или, может быть, минимальный пример для воспроизведения проблемы - person Robert Metzger; 16.11.2015

Вы должны зарегистрировать сериализаторы joda в ExecutionConfig из ExecutionEnvironment:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Serializers.registerJodaTime(env.getConfig());

Надеюсь это поможет.

person Fabian Hueske    schedule 11.11.2015
comment
Это определенно выглядит более многообещающим подходом. Однако попытка фрагмента кода по-прежнему приводит к NPE. Вы подозреваете какую-то другую проблему? - person Spyros Komninos; 12.11.2015