Я получаю сообщение об ошибке десериализации сообщения Avro при попытке использовать сообщение avro из темы и сбрасывать их в базу данных postgres.
Вот моя конфигурация продюсера:
spring Kafka producer: bootstrap-servers: localhost:9092 key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer properties: schema.registry.url: http://localhost:8081
конфиг приемника-коннектора:
{ "name": "jdbc_source_postgres_avro", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgres://localhost:5432/kafka-test", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter.schema.registry.url": "http://localhost:8081", "connection.user": "postgres", "connection.password": "password", "topics": "docker-avro-topic", "auto.create": "true", "auto.offset.reset": "latest", "name": "jdbc_source_postgres_avro" }
образ докера для подключения:
connect: image: cnfldemos/kafka-connect-datagen:0.1.3-5.3.1 hostname: connect container_name: connect depends_on: - zookeeper - broker - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
И, наконец, журнал ошибок:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
in error handler at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.kafka.connect.errors.DataException: Failed to deserialize
data for topic docker-avro-topic to Avro: at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)...
13 more Caused by:
**org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 101 Caused by:
java.net.ConnectException: Connection refused (Connection refused) at
java.net.PlainSocketImpl.socketConnect(Native Method)** at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at
java.net.Socket.connect(Socket.java:589) at
sun.net.NetworkClient.doConnect(NetworkClient.java:175) at
sun.net.www.http.HttpClient.openServer(HttpClient.java:463) at
sun.net.www.http.HttpClient.openServer(HttpClient.java:558) at
sun.net.www.http.HttpClient.<init>(HttpClient.java:242) at
sun.net.www.http.HttpClient.New(HttpClient.java:339) at
sun.net.www.http.HttpClient.New(HttpClient.java:357 at
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at
java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)