В проекте flink я использую щелчок класса case.
case class click( date: LocalDateTime, stbId:String, channelId :Int)
Этот класс заполнил наборы данных и отлично работал с датой java 8 java.time.LocalDateTime
. После переключения на org.joda (версия 2.9) в среде java 7 вызовы объектов щелчка в наборах данных не выполнялись, как раньше. Доступ к определенным функциям поля даты клика Объект бросил NullPointerExceptions
. Примером этих функций являются getHourOfDay
toString
и т. д. Я смог убедиться, что поле даты класса щелчка не равно нулю. Я подозреваю, что библиотека времени joda плохо взаимодействует с сериализацией крио. См. формат joda DateTime вызывает ошибку нулевого указателя в искре Функции RDD или NPE в искре с Joda DateTime В Flink API есть org.apache.flink.api.java.typeutils.runtime.kryo.Serializers со статическим методом registerJodaTime
. Это кажется актуальным. Я простодушно пытался
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
Этого было недостаточно. Прав ли я в этом? Как использовать java.typeutils.runtime.kryo?
Используемая версия Flink: 0.9.1. скала: 2.10 joda.time 2.9
Дополнение: Вот точный добавленный код, как было предложено (спасибо Фабиану и Роберту).
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
В файлах журнала встроенного исполнения я смог найти следующие важные части:
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
Тем не менее я стал свидетелем следующего
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
Serializers.registerJodaTime(new ExecutionConfig)
правильный вызов метода, но она не имеет никаких эффектов, потому что это не конфигурация выполнения из ExecutionEnvironment. - person Robert Metzger   schedule 12.11.2015