Чтение сообщения JMS как json, а не текста с помощью коннектора источника Kafka ActiveMQ

Я использую Kafka Connect в течение последних нескольких месяцев, и недавно я включил исходный плагин ActiveMQ, чтобы читать некоторые сообщения темы JMS, которые содержат файл json внутри, помещать их в тему kafka, а затем создавать поток / таблицу в Ksqldb, который использует в качестве столбцов некоторые ключи, которые есть в файле json. Дело в том, что плагин вставляет сообщение JMS в виде текста с двойными кавычками, поэтому оно не распознается должным образом в Ksqldb. Я пробовал разные вещи в конфигурации, чтобы исправить это, но пока ничего не помогло. Я также хочу использовать форматирование json, а не Avro в kafka connect (реестр схемы тоже не работает). В целях тестирования я также попытался отправить JMS-сообщения, указав содержимое заголовка как application / json, и все равно не повезло.

Вот как выглядит мой плагин ActiveMQ

 "config": {"connector.class":"ActiveMQSourceConnector", "tasks.max":"1", "kafka.topic":"activemq", "activemq.url":"tcp://localhost:61616","activemq.username":"admin","activemq.password":"admin","jms.destination.name":"topic.2","jms.destination.type":"topic","jms.message.format":"json","jms.message.converter":"org.apache.kafka.connect.json.JsonConverter","confluent.license":"","confluent.topic.bootstrap.servers":"localhost:9092"}}

и вот как выглядит моя конфигурация подключения Kafka

bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1


config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.13-2.5.0/plugins

Также вот пример того, как Kafka потребляет сообщения

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": "{\"widget\": {     \"debug\": \"on\",    \"window\": {        \"title\": \"Sample Konfabulator Widget\",        \"name\": \"main_window\",        \"width\": 500,        \"height\": 500    },    \"image\": {        \"src\": \"Images/Sun.png\",        \"name\": \"sun1\",        \"hOffset\": 250,        \"vOffset\": 250,        \"alignment\": \"center\"    },    \"text\": {        \"data\": \"Click Here\",        \"size\": 36,        \"style\": \"bold\",        \"name\": \"text1\",        \"hOffset\": 250,        \"vOffset\": 100,        \"alignment\": \"center\",        \"onMouseUp\": \"sun1.opacity = 39\"} }}\n"
}

Если потребуется какая-либо другая информация, пожалуйста, дайте мне знать. Любая помощь будет принята с благодарностью.

ОБНОВЛЕНИЕ. В конечном счете, лучшим решением было бы каким-то образом настроить коннектор, чтобы не экранировать кавычки в полезной нагрузке. Также, к сожалению, экранированные кавычки генерируются из самого activeMQ и не являются частью исходного сообщения.

Итак, сообщение будет выглядеть так

{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    }

}

person Eien1no1Yami    schedule 06.08.2020    source источник
comment
Вы можете продвигать свой вопрос в Slack сообщества Confluent. Есть канал #connect. Возможно, разместите ссылку на свой вопрос о SO с просьбой о помощи   -  person Andrew Coates    schedule 06.08.2020


Ответы (1)


Добро пожаловать, Elen1no1Yami!

Мне кажется, проблема в том, что поле text сообщения представляет собой строку, содержащую полезную нагрузку JSON, которая вас интересует, но эта полезная нагрузка имеет двойные кавычки, экранированные с помощью \ char.

Я предполагаю, что данные в самом ActiveMQ не имеют символа \, но было бы хорошо, если бы вы могли это прояснить.

Я вижу подходы к решению этой проблемы:

  1. иметь возможность настроить коннектор, чтобы НЕ экранировать кавычки в полезной нагрузке. Чтобы сообщение выглядело больше так:
{
  "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
  "text": {
   "widget": {
    "debug": "on",
    "window": {
      "title": "Sample Konfabulator Widget",
      "name": "main_window",
      "width": 500,
      "height": 500
    },
    "image": {
     "src": "Images/Sun.png",
     "name": "sun1",
     "hOffset": 250,
     "vOffset": 250,
     "alignment": 
     "center"
    },
    ... etc
}
  1. или каким-то образом попросите ksqlDB обработать сообщение, поскольку оно все еще имеет доступ к JSON в поле text.

Обобщает ли это то, что вы ищете? Если да, обновите свой вопрос, чтобы отразить это. (Хорошо включать такие подробности в свой вопрос, чтобы было понятно, о чем вы спрашиваете.

Что до ответа ...

  1. Я не эксперт по подключению, поэтому не могу комментировать и не вижу ничего в деталях конфигурация коннектора, которая может позволить вам изменять содержимое text. Другие, кто знает больше о Connect, могут больше помочь.

  2. Чтобы получить доступ к встроенному / экранированному JSON в ksqlDB, вам сначала нужно удалить экранирование. См. Ниже способы сделать это с помощью ksqlDB.

Использование ksqlDB для доступа к экранированному JSON

Прежде чем мы сможем получить доступ к документу JSON в text, мы должны удалить экранирование.

Я могу думать о двух способах развития моей головы:

Напишите собственный UDF

Лучшим способом было бы написать пользовательский UDF unescape_json, который может удалить экранирование.

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use custom UDF to process this and write it back as a properly formatted JSON document:
CREATE STREAM JSONIFIED AS
  SELECT MY_CUSTOM_UDF(message) FROM RAW;

При правильном написании пользовательский подход UDF не будет страдать от потенциальных проблем с повреждением данных, от которых страдает решение на основе REPLACE.

Использование REPLACE для удаления экранирования

ПРИМЕЧАНИЕ: это хрупкое решение: замена символов может совпадать и заменять то, чего не следует, в зависимости от содержания вашего сообщения!

Давайте поработаем с более простыми тестовыми данными, чтобы объяснить, что нужно, например, мы хотим преобразовать:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": 10}"
}

To:

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": 10}
}

Для этого требуются три вещи:

  1. Заменить проем "text": "{ на "text": {
  2. Заменить все \" на ".
  3. Заменить закрытие }" на }

Мы можем использовать REPLACE. функцию для этого или REGEXP_REPLACE функция:

-- Import raw stream with value as simple STRING containing all the payload
CREATE STREAM RAW (
  message STRING
 ) WITH (
  KAFKA_TOPIC=<something>,
  VALUE_FORMAT='KAFKA'
 );


-- Use REPLACE to remove reformat:
CREATE STREAM JSONIFIED AS 
  SELECT 
    REPLACE(
      REPLACE(
        REPLACE(message, 
          '"text": "{', '"text": {'), 
          '\"', '"'), 
          '"}', '}')
  FROM RAW;

Конечно, это решение потенциально может повредить ваши данные, если они содержат какие-либо условия поиска: "text": "{, \" или "} где-либо еще в ваших данных, например

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": "{\"widget\": \"hello \\\"} world\"}"
}

Было бы неправильно преобразовано в

{
 "messageID": "ID:plato-46377-1596636746117-4:4:1:1:1",
 "text": {"widget": "hello \\}world"}
}

Вот почему предпочтительнее использовать пользовательский UDF.

После того, как вы исправили содержимое своего ввода (и записали его в новую тему), вы можете импортировать свои данные как обычно:

CREATE STREAM DATA (
   messageId STRING,
   text STRUCT<Widget INT>
 ) WITH (
   kafka_topic='JSONIFIED',
   value_format='JSON'
 );
person Andrew Coates    schedule 06.08.2020
comment
Спасибо за ответ. Я обновил свой вопрос, чтобы лучше понять, чего я хочу. Решение 1) было бы наиболее предпочтительным, но я также не знаю, как это сделать. Я тоже искал на странице конфигурации коннектора, но, к сожалению, не нашел ничего полезного. Если нет другого пути, попробую решение 2). - person Eien1no1Yami; 06.08.2020