Сериализация Flink ZonedDateTime

Мне приходится работать с часовыми поясами и наносекундным временным разрешением. Поэтому я использую ZonedDateTime. Очевидно, Apache Flink неправильно сериализует ZonedDateTime. Он сериализует часть LocalDateTime, как и ожидалось, однако забывает обработать часовой пояс.

Когда я регистрирую, например, зонированную дату внутри функции карты потока Flink, я всегда получаю что-то вроде

 2018-03-01T04:10:30.773471918null

Принимая во внимание, что при вводе данных я получаю правильную зону

 2018-03-01T04:10:30.773471918-05:00

Нуль относится к зоне. Позже, конечно, я получаю исключение нулевого указателя, так как мне приходится использовать правильное сравнение времени, для которого нужна зона.

Как я могу исправить это проще всего? Спасибо за ответ.


person Daniel    schedule 01.05.2018    source источник
comment
Я использую библиотеку timeforscala для удобства github.com/johanandren/timeforscala, может ли это быть проблемой ?   -  person Daniel    schedule 01.05.2018


Ответы (1)


Я не совсем понимаю, почему он не подхватывает сериализатор. Это решение, по крайней мере, работает: я реализовал сериализатор Kryo для ZonedDateTime.

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime

class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
  setImmutable(true)

  override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
    ZonedDateTimeSerializer.write(out, obj)
  }

  override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
    ZonedDateTimeSerializer.read(in)
  }
}

object ZonedDateTimeSerializer {
  def write(out: Output, obj: ZonedDateTime): Unit = {
    LocalDateSerializer.write(out, obj.toLocalDate)
    LocalTimeSerializer.write(out, obj.toLocalTime)
    ZoneIdSerializer.write(out, obj.getZone)
  }

  def  read(in: Input): ZonedDateTime = {
    val date = LocalDateSerializer.read(in)
    val time = LocalTimeSerializer.read(in)
    val zone = ZoneIdSerializer.read(in)
    ZonedDateTime(date, time, zone)
  }
}

Я взял реализацию из новейшей реализации Киро. Затем я зарегистрировал его следующим образом:

    env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])

Кажется, это решает проблему. Не уверен, что это связано с тем, что я использую timesforscala, но я хочу использовать эту библиотеку, потому что она добавляет важные дополнения, от которых я зависим. Комментарии приветствуются.

person Daniel    schedule 01.05.2018