Flink: поток данных в таблицу

Вариант использования: прочтите сообщения protobuf от Kafka, десериализуйте их, примените некоторые преобразования (сгладьте некоторые столбцы) и напишите в Dynamodb.

К сожалению, Kafka Flink Коннектор поддерживает только форматы csv, json и avro. Итак, мне пришлось использовать API более низкого уровня (поток данных).

Проблема: если я могу создать таблицу из объекта потока данных, я могу принять запрос для выполнения в этой таблице. Это сделало бы часть трансформации бесшовной и универсальной. Можно ли выполнить SQL-запрос к объекту потока данных?


person Nitin Pandey    schedule 28.04.2020    source источник


Ответы (1)


Если у вас есть DataStream объектов, то вы можете просто зарегистрировать данный DataStream как таблицу, используя StreamTableEnvironment.

Это будет выглядеть примерно так:

val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])

Затем вы сможете запросить динамическую таблицу, созданную из вашего DataStream.

person Dominik Wosiński    schedule 28.04.2020