Я использую Kafka Connect в течение последних нескольких месяцев, и недавно я включил исходный плагин ActiveMQ, чтобы читать некоторые сообщения темы JMS, которые содержат файл json внутри, помещать их в тему kafka, а затем создавать поток / таблицу в Ksqldb, который использует в качестве столбцов некоторые ключи, которые есть в файле json. Дело в том, что плагин вставляет сообщение JMS в виде текста с двойными кавычками, поэтому оно не распознается должным образом в Ksqldb. Я пробовал разные вещи в конфигурации, чтобы исправить это, но пока ничего не помогло. Я также хочу использовать форматирование json, а не Avro в kafka connect (реестр схемы тоже не работает). В целях тестирования я также попытался отправить JMS-сообщения, указав содержимое заголовка как application / json, и все равно не повезло.
Вот как выглядит мой плагин ActiveMQ
"config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}
и вот как выглядит моя конфигурация подключения Kafka
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka_2.13-2.5.0/plugins
Также вот пример того, как Kafka потребляет сообщения
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": "{\"widget\": { \"debug\": \"on\", \"window\": { \"title\": \"Sample Konfabulator Widget\", \"name\": \"main_window\", \"width\": 500, \"height\": 500 }, \"image\": { \"src\": \"Images/Sun.png\", \"name\": \"sun1\", \"hOffset\": 250, \"vOffset\": 250, \"alignment\": \"center\" }, \"text\": { \"data\": \"Click Here\", \"size\": 36, \"style\": \"bold\", \"name\": \"text1\", \"hOffset\": 250, \"vOffset\": 100, \"alignment\": \"center\", \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}
Если потребуется какая-либо другая информация, пожалуйста, дайте мне знать. Любая помощь будет принята с благодарностью.
ОБНОВЛЕНИЕ. В конечном счете, лучшим решением было бы каким-то образом настроить коннектор, чтобы не экранировать кавычки в полезной нагрузке. Также, к сожалению, экранированные кавычки генерируются из самого activeMQ и не являются частью исходного сообщения.
Итак, сообщение будет выглядеть так
{
"messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
"text": {
"widget": {
"debug": "on",
"window": {
"title": "Sample Konfabulator Widget",
"name": "main_window",
"width": 500,
"height": 500
},
"image": {
"src": "Images/Sun.png",
"name": "sun1",
"hOffset": 250,
"vOffset": 250,
"alignment":
"center"
}
}