Как прикрепить схему к Flink DataStream - на лету?

Я имею дело с потоком мутаций базы данных, то есть с потоком журнала изменений. Я хочу преобразовать значения с помощью SQL-запроса. Мне трудно собрать вместе следующие три концепции RowTypeInfo, Row и DataStream.

ПРИМЕЧАНИЕ: я заранее не знаю схему. Я создаю его на лету, используя данные в объекте Mutation (Mutation - это настраиваемый тип)

В частности, у меня есть такой код.

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)

// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

ПРИМЕЧАНИЕ. Объект Row позиционирован и не имеет местозаполнителя для имен столбцов. Я не мог найти место, чтобы прикрепить схему к объекту DataStream.

Я хочу передать какую-то структуру, похожую на Row, которая содержит полную информацию {columnName: String, columnValue: Object, columnType: TypeInformation[_]} для запроса.


person user758988    schedule 09.02.2018    source источник


Ответы (1)


В Flink SQL схема таблицы является обязательной, если определено Table. Невозможно выполнять запросы к динамически типизированным записям.

Что касается концепций RowTypeInfo, Row и DataStream:

  • Row - это фактическая запись, содержащая данные
  • RowTypeInfo - описание схемы для Rows. Он содержит имена и TypeInformation для каждого поля Row.
  • DataStream - это логический поток записей. DataStream[Row] - это поток строк. Обратите внимание, что это не фактический поток, а просто концепция API для представления потока в API.
person Fabian Hueske    schedule 09.02.2018
comment
Но фрагмент кода компилируется - не должен ли он не запрашивать схему? Я предполагаю, что мой вопрос в том, где в строке строки я прикрепляю RowTypeInfo? Мутация - это мой настраиваемый тип, и я могу преобразовать его в строку с помощью функции toRows. val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)}) - person user758988; 14.02.2018
comment
О, я вижу. Компилятор может проверять только статические типы, но не может просматривать поля строки. В этом разница при использовании Row вместо Tuple, в котором типы полей определены универсальными типами. Вы можете прикрепить RowTypeInfo к любому оператору с помощью returns() метода: in.map(...).returns(Types.ROW(Types.STRING, Types.INT)). - person Fabian Hueske; 14.02.2018
comment
Спасибо за ответ! Методы map и flatMap (операторы?) возвращают объект DataStream, у которого нет метода returns. Я заметил, что интерфейс StreamTransformation имеет метод returns. Как мне продлить и передать его вместо flatmap? Должен ли я вообще это делать? - person user758988; 15.02.2018
comment
Ой, извини. returns() доступен только для Java DataStream API, но вы используете Scala. В Scala вы можете передать TypeInformation как неявное значение, то есть implicit val rowType: TypeInformation[Row] = Types.ROW(...) - person Fabian Hueske; 15.02.2018
comment
Понятно. Версия flatMap для scala возвращает DataStream без returns() метода. Но java возвращает SingleOutputStreamOperator. Похоже, это радует компилятор. new DataStream[Row](mutationStream.javaStream.flatMap(mutationToRows).returns(rowTypeInfo)) - person user758988; 15.02.2018
comment
^^ Только что увидел ваш комментарий! Спасибо Фабиан - person user758988; 15.02.2018