Как я могу использовать коннектор Debezium с Apache Flink

Я пытаюсь создать таблицу API таблицы flink, которая использует функцию источника Debezium, я нашел реализацию этих функций здесь https://github.com/ververica/flink-cdc-connectors и использовал их в моем коде следующим образом:

val debeziumProperties = new Properties()
  debeziumProperties.setProperty("plugin.name", "wal2json")
  debeziumProperties.setProperty("format", "debezium-json")

  val sourceFunction: DebeziumSourceFunction[TestCharge] = PostgreSQLSource.builder()
    .hostname("******")
    .port(5432)
    .database("*****") // monitor all tables under inventory database
    .username("*****")
    .password("*****")
    .debeziumProperties(debeziumProperties)
    .deserializer(new CustomDebeziumDeserializer) // converts SourceRecord to String
    .build()

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val sTableEnv = StreamTableEnvironment.create(env, sSettings)

  val cdcStream: DataStream[TestCharge] = env
    .addSource(sourceFunction)
    .map(x => x)

  sTableEnv.createTemporaryView("historic", cdcStream, 'chargeId, 'email, 'amount, 'cardHash)
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(amount) FROM historic GROUP BY chargeId")

  val reverse = sTableEnv.toRetractStream[Row](table)

  reverse.print()

Я также добавил эту зависимость, как описано в документации:

"com.alibaba.ververica" % "flink-sql-connector-postgres-cdc" % "1.1.0"

Когда я пытаюсь запустить свою работу локально в мини-кластере, она работает нормально, но в кластере Flink, подготовленном в Kubernetes, я получаю следующее исключение:

Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found

Кто-нибудь знает, что может происходить, или мне не хватает зависимости?

Заранее спасибо.


person whiteskull    schedule 20.11.2020    source источник


Ответы (1)


если вы хотите использовать его в TableAPI / SQL, вы можете просто зарегистрировать таблицу с помощью SQL DDL.

sTableEnv.executeSql(
      """
        |CREATE TABLE shipments (
        |  shipment_id INT,
        |  order_id INT,
        |  origin STRING,
        |  destination STRING,
        |  is_arrived BOOLEAN
        |) WITH (
        |  'connector' = 'postgres-cdc',
        |  'hostname' = 'localhost',
        |  'port' = '5432',
        |  'username' = 'postgres',
        |  'password' = 'postgres',
        |  'database-name' = 'postgres',
        |  'schema-name' = 'public',
        |  'table-name' = 'shipments'
        |)
        |""".stripMargin)
// then you can query the table
  val table: Table = sTableEnv.sqlQuery("SELECT SUM(shipment_id) FROM shipments GROUP BY order_id")

Это самый простой способ работы с исходным кодом CDC. Поскольку в настоящее время Table API не поддерживает преобразование потока журнала изменений в Table.

Что касается вашей проблемы, я думаю, что это может быть из-за конфликтов зависимостей. Пожалуйста, проверьте, не зависите ли вы от другой версии <artifactId>debezium-embedded</artifactId>. Если да, удалите его. flink-sql-connector-postgres-cdc уже упаковывает его с версией 1.12.

person Jark Wu    schedule 21.11.2020
comment
Не могли бы вы предоставить ссылку на ваше указанное выше утверждение. Table API не поддерживает преобразование потока журнала изменений в таблицу. - person python_enthusiast; 27.01.2021
comment
@python_enthusiast это новая функция, находящаяся в стадии разработки, надеюсь, она будет выпущена в следующей версии 1.13. См. cwiki.apache.org/confluence/display/FLINK/ - person Jark Wu; 28.01.2021
comment
@JarkWu - мне нужно создать поток данных из источника данных postgres, и мой набор результатов представляет собой объединение трех таблиц. Можете привести пример такого сценария? - person Swapnil Khante; 11.05.2021