Я имею дело с потоком мутаций базы данных, то есть с потоком журнала изменений. Я хочу преобразовать значения с помощью SQL-запроса. Мне трудно собрать вместе следующие три концепции RowTypeInfo
, Row
и DataStream
.
ПРИМЕЧАНИЕ: я заранее не знаю схему. Я создаю его на лету, используя данные в объекте Mutation
(Mutation
- это настраиваемый тип)
В частности, у меня есть такой код.
val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")
ПРИМЕЧАНИЕ. Объект Row
позиционирован и не имеет местозаполнителя для имен столбцов. Я не мог найти место, чтобы прикрепить схему к объекту DataStream
.
Я хочу передать какую-то структуру, похожую на Row
, которая содержит полную информацию {columnName: String, columnValue: Object, columnType: TypeInformation[_]}
для запроса.