Ошибка десериализации сообщения Avro на Sink Connector

Я получаю сообщение об ошибке десериализации сообщения Avro при попытке использовать сообщение avro из темы и сбрасывать их в базу данных postgres.

Вот моя конфигурация продюсера:

spring
  Kafka
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        schema.registry.url: http://localhost:8081

конфиг приемника-коннектора:

{
"name": "jdbc_source_postgres_avro",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgres://localhost:5432/kafka-test",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "connection.user": "postgres",
    "connection.password": "password",
    "topics": "docker-avro-topic",
    "auto.create": "true",
    "auto.offset.reset": "latest",
    "name": "jdbc_source_postgres_avro"
}

образ докера для подключения:

connect:
image: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1
hostname: connect
container_name: connect
depends_on:
  - zookeeper
  - broker
  - schema-registry
ports:
  - "8083:8083"
environment:
  CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
  CONNECT_REST_ADVERTISED_HOST_NAME: connect
  CONNECT_REST_PORT: 8083
  CONNECT_GROUP_ID: compose-connect-group
  CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
  CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
  CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
  CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
  CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
  CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
  CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'

И, наконец, журнал ошибок:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic docker-avro-topic to Avro:   at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
    at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)...
13 more Caused by:
**org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 101 Caused by:
java.net.ConnectException: Connection refused (Connection refused)  at
java.net.PlainSocketImpl.socketConnect(Native Method)**     at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)   at
java.net.Socket.connect(Socket.java:589)    at
sun.net.NetworkClient.doConnect(NetworkClient.java:175)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:463)     at
sun.net.www.http.HttpClient.openServer(HttpClient.java:558)     at
sun.net.www.http.HttpClient.<init>(HttpClient.java:242)     at
sun.net.www.http.HttpClient.New(HttpClient.java:339)    at
sun.net.www.http.HttpClient.New(HttpClient.java:357     at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
    at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
    at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
    at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
    at
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)



Ответы (1)


Вы настроили Kafka Connect для поиска реестра схем, работающего в том же контейнере:

"value.converter.schema.registry.url": "http://localhost:8081",

но это не так, поэтому:

java.net.ConnectException: Connection refused (Connection refused)

Поскольку вы уже указали конфигурацию реестра схем в переменных среды докера Kafka Connect, то, предполагая, что у вас действительно есть реестр схем, работающий в контейнере, который может быть разрешен как schema-registry из контейнера Kafka Connect, вы можете просто удалить строки конфигурации schema.registry.url из фактической конфигурации соединителя.

person Robin Moffatt    schedule 16.01.2020
comment
Я пробовал без schema.registry.url, но не смог сказать, что не может найти/отсутствует schema.registry.url. Таким образом, вместо локального хоста был указан schema-registry:8081, и это сработало!! - person Oviyan; 17.01.2020
comment
Я получаю сообщение об ошибке, говорящее, что java.sql.SQLException: не найдено подходящего драйвера для jdbc:postgres://localhost:5432/ Как установить драйвер postgres в док-контейнер или дать ссылку на мой локальный путь?!! - person Oviyan; 17.01.2020
comment
Не стесняйтесь пометить этот ответ как правильный и начать новый вопрос для вашего нового вопроса :) - person Robin Moffatt; 17.01.2020
comment
Драйвер @Oviyan Postgres уже является частью док-контейнеров Confluent. В любом случае вам следует поискать в Confluent Github CONNECT_PLUGIN_PATH. - person OneCricketeer; 17.01.2020
comment
@cricket_007: у меня уже есть приведенная ниже конфигурация, должен ли я добавить какой-либо другой путь? ПУТЬ К КЛАССУ: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components - person Oviyan; 17.01.2020
comment
@Oviyan, вы можете запустить контейнер и найти драйвер Postgres в /usr/share/java/kafka-connect-jdbc? - person OneCricketeer; 17.01.2020
comment
@cricket_007:ой! я вижу только папку kafka. Поэтому я переместил файл jar в docker-image root@broker:/usr/share/java/kafka-connect-jdbc#. Я также вижу postgresql-9.4.1212.jar. Тем не менее, я все еще получаю ту же проблему. - person Oviyan; 17.01.2020
comment
@Oviyan Я бы предложил использовать confluentinc/cp-kafka-connect в качестве базового изображения, а не cnfldemos/kafka-connect-datagen:0.1.3-5.3.1 - person OneCricketeer; 17.01.2020