Проблема с часовым поясом сериализации flink json

Я использую JsonRowSerializationSchema для сериализации Flink's Row в JSON. У сериализации временных меток SQL есть проблемы с часовыми поясами.

  val row = new Row(1)
  row.setField(0, new Timestamp(0))

  val tableSchema = TableSchema
    .builder
    .field("c", DataTypes.TIMESTAMP(3).bridgedTo(classOf[Timestamp]))
    .build

  val serializer = JsonRowSerializationSchema.builder()
    .withTypeInfo(tableSchema.toRowType)
    .build()

  println(new String(serializer.serialize(row)))

{c: 1969-12-31T16: 00: 00Z}

Я вижу, что он использует PST (местный часовой пояс) для интерпретации метки времени, но затем выводит UTC (см. Z в выводе)

Если я сделаю TimeZone.setDefault(TimeZone.getTimeZone("UTC")), он напечатает {"c":"1970-01-01T00:00:00Z"}. Мои временные метки созданы для времени UTC, и я хочу, чтобы Flink интерпретировал их как UTC.

Я проверяю реализацию Flink, действуют следующие два метода.

    private JsonNode convertLocalDateTime(ObjectMapper mapper, JsonNode reuse, Object object) {
        return mapper.getNodeFactory()
            .textNode(RFC3339_TIMESTAMP_FORMAT.format((LocalDateTime) object));
    }

    private JsonNode convertTimestamp(ObjectMapper mapper, JsonNode reuse, Object object) {
        Timestamp timestamp = (Timestamp) object;
        return convertLocalDateTime(mapper, reuse, timestamp.toLocalDateTime());
    }

Похоже, что реализация жестко запрограммирована, есть ли способ указать Flink использовать UTC без изменения системного времени?


person lalala    schedule 17.11.2020    source источник


Ответы (1)


java.sql.Timestamp очень проблематичен, потому что он зависит от часового пояса. Вот почему мы заменили его новыми java.time.* классами в новой системе типов Table / SQL.

Мы рекомендуем, чтобы все JVM Flink были настроены в часовом поясе UTC для устаревшей реализации.

Для Table / SQL мы используем новый org.apache.flink.formats.json.JsonRowDataSerializationSchema, но он работает с внутренними структурами данных. Я бы рекомендовал просто скопировать исходный код JsonRowSerializationSchema и реализовать нужный вам формат. Или используйте библиотеку Джексона напрямую, чтобы вообще не иметь дела с TypeInformation.

person twalthr    schedule 19.11.2020