Я пытаюсь создать таблицу API таблицы flink, которая использует функцию источника Debezium, я нашел реализацию этих функций здесь https://github.com/ververica/flink-cdc-connectors и использовал их в моем коде следующим образом:
val debeziumProperties = new Properties()
debeziumProperties.setProperty("plugin.name", "wal2json")
debeziumProperties.setProperty("format", "debezium-json")
val sourceFunction: DebeziumSourceFunction[TestCharge] = PostgreSQLSource.builder()
.hostname("******")
.port(5432)
.database("*****") // monitor all tables under inventory database
.username("*****")
.password("*****")
.debeziumProperties(debeziumProperties)
.deserializer(new CustomDebeziumDeserializer) // converts SourceRecord to String
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val sTableEnv = StreamTableEnvironment.create(env, sSettings)
val cdcStream: DataStream[TestCharge] = env
.addSource(sourceFunction)
.map(x => x)
sTableEnv.createTemporaryView("historic", cdcStream, 'chargeId, 'email, 'amount, 'cardHash)
val table: Table = sTableEnv.sqlQuery("SELECT SUM(amount) FROM historic GROUP BY chargeId")
val reverse = sTableEnv.toRetractStream[Row](table)
reverse.print()
Я также добавил эту зависимость, как описано в документации:
"com.alibaba.ververica" % "flink-sql-connector-postgres-cdc" % "1.1.0"
Когда я пытаюсь запустить свою работу локально в мини-кластере, она работает нормально, но в кластере Flink, подготовленном в Kubernetes, я получаю следующее исключение:
Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found
Кто-нибудь знает, что может происходить, или мне не хватает зависимости?
Заранее спасибо.