Вопросы по теме 'flink-sql'

Как прикрепить схему к Flink DataStream - на лету?
Я имею дело с потоком мутаций базы данных, то есть с потоком журнала изменений. Я хочу преобразовать значения с помощью SQL-запроса. Мне трудно собрать вместе следующие три концепции RowTypeInfo , Row и DataStream . ПРИМЕЧАНИЕ: я заранее не...
1681 просмотров

Ошибка Apache Flink java.lang.ClassNotFoundException: org.apache.flink.table.sources.TableSource?
Я пишу потоковую службу в Apache Flink. Я в основном выбираю данные из файла CSV с помощью org.apache.flink.table.sources.CsvTableSource. Ниже приведен код для того же: StreamTableEnvironment streamTableEnvironment = TableEnvironment...
3926 просмотров

Как написать SQL для расчета на основе добавочного окна пакетной таблицы
Мое требование состоит в том, чтобы вычислить на основе окна добавочного размера для пакетной таблицы. Например, первое окно имеет 1 строку, второе окно имеет 2 строки (включая 1 строку из 1-го окна и новую строку), затем 3 строки в 3-м окне...
74 просмотров
schedule 30.05.2023

Проблема с Batch Table API в Flink 1.5 - Жалуется на необходимость Streaming API
Я пытаюсь создать пакетно-ориентированное задание Flink с Flink 1.5.0 и хочу использовать API таблиц и SQL для обработки данных. Моя проблема заключается в попытке создать BatchTableEnviroment. Я получаю ошибку компиляции....
133 просмотров
schedule 25.11.2022

Почему Flink SQL использует оценку количества строк в 100 строк для всех таблиц?
Я не был уверен, почему логический план не был правильно оценен в этом примере . Я более подробно изучил базовый код Flink и проверил, что когда кальцит оценивает / оценивает количество строк для запроса в объекте. По какой-то причине он всегда...
238 просмотров

Экспоненциально убывающая скользящая средняя в окне скачкообразного изменения в Flink SQL: время кастинга
Теперь у нас есть SQL с причудливым оконным интерфейсом во Flink, я пытаюсь указать на убывающую скользящую среднюю как «то, что будет возможно в будущих выпусках Flink как для Table API, так и для SQL». из их дорожной карты SQL / предварительной...
222 просмотров

Как преобразовать поток данных с массивом json в поток данных отдельных элементов массива
У меня есть поток данных [ObjectNode], который я читал как десериализованный json из темы kafka. Один из элементов этого ObjectNode - это фактически массив событий. Этот массив имеет разную длину. Входящий поток json выглядит так: {...
677 просмотров
schedule 15.02.2022

Как мы используем конфигурации запросов при использовании клиента SQL в Flink SQL?
Как мы используем конфигурации запросов при использовании клиента SQL в Flink SQL? Таким же образом, как указано в приведенной ниже ссылке для https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html...
135 просмотров

Apache Flink - обработка дублированных сообщений во время развертывания заданий с ActiveMQ в качестве источника
Учитывая, У меня есть задание Flink, которое читает из ActiveMQ источника и записывает в базу данных mysql - с ключом по идентификатору. Я включил контрольные точки для этой работы каждую секунду. Я указываю контрольные точки на экземпляр...
218 просмотров

Flink: поток данных в таблицу
Вариант использования : прочтите сообщения protobuf от Kafka, десериализуйте их, примените некоторые преобразования (сгладьте некоторые столбцы) и напишите в Dynamodb. К сожалению, Kafka Flink Коннектор поддерживает только форматы csv, json и...
463 просмотров

Ошибка контрольной точки Flink в Kubernetes с FsStateBackend
Я получаю сообщение об ошибке, указанное ниже, при использовании flink в кубернетах с бэкэндом состояния задания FsStateBackend, например: env.setStateBackend(new FsStateBackend("file:///data/flink/checkpoints")) Я устанавливаю это в самом коде....
167 просмотров
schedule 18.07.2022

Исключение NullPointer при попытке доступа или чтения ReadOnly ctx в методе processElement в KeyedBroadCastProcessFunction в Apache Flink
У меня есть интересный сценарий, в котором я работаю над сопоставлением шаблонов во flink, оценивая входящие шаблоны с помощью функции keyedbroadcastprocess, когда я запускаю программу в среде IDE, я получаю исключение нулевого указателя в функции...
438 просмотров

Можно ли запустить простое задание в диспетчере заданий во Flink?
Я написал задание Flink, которое считывает данные из Kafka и записывает в файл hdfs в формате ORC для HIVE (использует 20 исполнителей). Мне нужно выполнить простое задание, которое каждый час вставляет раздел в таблицу HIVE. Можно ли запустить это...
35 просмотров
schedule 12.04.2022

Stream kinesis Analytics ETL Flink - пропускать записи до и после задержки
ИЗМЕНИТЬ: У меня есть требование пропустить записи, созданные до 10 и 20 секунд после того, как произойдет разрыв во входящих данных. (Считается, что разрыв возникает, когда время события1 - время события2> 3 секунды) полученные данные...
91 просмотров

KeyBy против GroupBy в Apache Flink
В чем сходство и различие KeyBy и GroupBy во Flink? Если в программе только для таблиц используется API таблиц / SQL, является ли GroupBy эквивалентом KeyBy?
480 просмотров
schedule 10.01.2024

Контрольная точка во Flink не работает с функцией CoFlatMapFunction
Привет, я пытаюсь выполнить контрольную точку в одном из моих модулей flink, в котором я использую CoFlatMapFunction для объединения с потоками, если я закомментирую контрольную точку CoFlatMapFunction, работает, если раскомментировать снова, она не...
122 просмотров

как извлечь происхождение столбцов и таблиц из flink sql
Я хочу создать систему происхождения для хранилища данных в реальном времени , как я могу извлечь происхождение таблиц и столбцов из flink sql?
89 просмотров
schedule 03.06.2022

Проблема с часовым поясом сериализации flink json
Я использую JsonRowSerializationSchema для сериализации Flink's Row в JSON. У сериализации временных меток SQL есть проблемы с часовыми поясами. val row = new Row(1) row.setField(0, new Timestamp(0)) val tableSchema = TableSchema...
136 просмотров
schedule 27.04.2022

Как я могу использовать коннектор Debezium с Apache Flink
Я пытаюсь создать таблицу API таблицы flink, которая использует функцию источника Debezium, я нашел реализацию этих функций здесь https://github.com/ververica/flink-cdc-connectors и использовал их в моем коде следующим образом: val...
406 просмотров

Почему задание Flink SQL меняется с "Выполняется" на "Завершено" при запросе таблицы базы данных SQL Server в задании Flink SQL?
Когда мы выбираем любую таблицу в Flink SQL CLI, которая является источником для таблицы базы данных SQL Server, почему задание Flink завершается после того, как оно извлекает все записи из таблицы базы данных? Можно ли заставить его работать, чтобы...
46 просмотров
schedule 06.06.2022