Соединитель Mongodb debezium заполняет Rowkey как 0 (ноль) и Key как 0 ноль, даже если данные содержат допустимые значения

У меня есть образец файла Json, который я загрузил в mongo db для отправки в Kafka connect с помощью коннектора debezium. Мой вопрос, как значения столбцов RowKey и id могут быть преобразованы в 0 (нули), несмотря на наличие действительных данных в необработанном входном файле json? Ниже приведены шаги, выполняемые для возникновения этой ошибки.

Шаг 1. Создайте образец файла json

{"id":1,"first_name":"Devinne","last_name":"Stubbe","email":"[email protected]","gender":"Female","club_status":"platinum","comments":"Extended interactive initiative"}
                    {"id":2,"first_name":"Loise","last_name":"Salt","email":"[email protected]","gender":"Female","club_status":"silver","comments":"Optimized neutral standardization"}
                    {"id":3,"first_name":"Benjamen","last_name":"Spittle","email":"[email protected]","gender":"Male","club_status":"platinum","comments":"De-engineered systemic customer loyalty"}
                    {"id":4,"first_name":"Helena","last_name":"Pogosian","email":"[email protected]","gender":"Female","club_status":"gold","comments":"Phased hybrid definition"}
                    {"id":5,"first_name":"Miller","last_name":"Karolewski","email":"[email protected]","gender":"Male","club_status":"platinum","comments":"Monitored systematic software"}

Загрузите эти данные в mongodb в коллекцию customerprofile

rs0:PRIMARY> db.collection.find();
        rs0:PRIMARY> db.customerprofile.find();
        { "_id" : ObjectId("5b6213ba78aae5dc09ef8b5b"), "id" : 1, "first_name" : 
        "Devinne", "last_name" : "Stubbe", "email" : "[email protected]", "gender" : 
       "Female", "club_st
       atus" : "platinum", "comments" : "Extended interactive initiative" }
       { "_id" : ObjectId("5b6213ba78aae5dc09ef8b5c"), "id" : 2, "first_name" : 
       "Loise", "last_name" : "Salt", "email" : "[email protected]", "gender" : 
       "Female", "club_status"
        : "silver", "comments" : "Optimized neutral standardization" }
        { "_id" : ObjectId("5b6213ba78aae5dc09ef8b5d"), "id" : 3, "first_name" : 
        "Benjamen", "last_name" : "Spittle", "email" : "[email protected]", 
        "gender" : "Male"
        , "club_status" : "platinum", "comments" : "De-engineered systemic customer 
        loyalty" }
        { "_id" : ObjectId("5b6213ba78aae5dc09ef8b5e"), "id" : 4, "first_name" : 
        "Helena", "last_name" : "Pogosian", "email" : "hpogosian3@pagesperso- 
       orange.fr", "gender" : "Fe
       male", "club_status" : "gold", "comments" : "Phased hybrid definition" }
       { "_id" : ObjectId("5b6213ba78aae5dc09ef8b5f"), "id" : 5, "first_name" : 
        "Miller", "last_name" : "Karolewski", "email" : "[email protected]", "gender"    
      : "Male", "club_status" : "platinum", "comments" : "Monitored systematic 
       software" }
       { "_id" : ObjectId("5b6213ba78aae5dc09ef8b60"), "id" : 6, "first_name" : 
       "Cammy", "last_name" : "Suche", "email" : "[email protected]", "gender" : 
       "Male", "club_status" : "gold", "comments" : "Inverse client-server alliance" 
        }

Шаг 2: Создайте конфигурацию коннектора mongodb debezium и активируйте коннектор в kafka connect

{
  "name": "mongodb-jsondb-connector",
  "config":{
   "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
   "mongodb.hosts": "rs0/169.254.57.118:27017",
   "mongodb.name": "jsondb",
   "mongodb.members.auto.discover": "false",
   "database.whitelist": "jsondb",
    "transforms": "unwrap",
   "transforms.unwrap.type": 
 "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "database.history.kafka.topic": "schema-changes.jsondb",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "key.converter.schemas.enable": "false",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter.schemas.enable": "false"
 }
}

Шаг 3. Проверьте очередь тем

  ksql> print 'jsondb.jsondb.customerprofile' from beginning;
   Format:JSON
   {"ROWTIME":1533154567091,"ROWKEY":" 

Как вы можете видеть ниже, вывод debezium преобразован столбцом id с objectid, например

 \"id\":\"5b6213ba78aae5dc09ef8b5b\"}","id":"5b6213ba78aae5dc09ef8b5b".  

**** Мой вопрос, как мой столбец идентификатора мог быть обновлен с помощью ключа серийного идентификатора, потому что мы использовали JsonConvetor в качестве преобразователя ключей? ****

 {\"id\":\"5b6213ba78aae5dc09ef8b5b\"}","id":"5b6213ba78aae5dc09ef8b5b","first_name":"Devinne","last_name":"Stubbe","email":"[email protected]","gender":"Female","club_status":"platinum","comments":"Extended interactive initiative"}
    {"ROWTIME":1533154567096,"ROWKEY":" 

  {\"id\":\"5b6213ba78aae5dc09ef8b5c\"}","id":"5b6213ba78aae5dc09ef8b5c","first_name":"Loise","last_name":"Salt","email":"[email protected]","gender":"Female","club_status":"silver","comments":"Optimized neutral standardization"}
    {"ROWTIME":1533154567096,"ROWKEY":" 

 {\"id\":\"5b6213ba78aae5dc09ef8b5d\"}","id":"5b6213ba78aae5dc09ef8b5d","first_name":"Benjamen","last_name":"Spittle","email":"[email protected]","gender":"Male","club_status":"platinum","comments":"De-engineered systemic customer loyalty"}
{"ROWTIME":1533154567097,"ROWKEY":"{\"id\":\"5b6213ba78aae5dc09ef8b5e\"}","id":"5b6213ba78aae5dc09ef8b5e","first_name":"Helena","last_name":"Pogosian","email":"[email protected]","gender":"Female","club_status":"gold","comments":"Phased hybrid definition"}
{"ROWTIME":1533154567097,"ROWKEY":"{\"id\":\"5b6213ba78aae5dc09ef8b5f\"}","id":"5b6213ba78aae5dc09ef8b5f","first_name":"Miller","last_name":"Karolewski","email":"[email protected]","gender":"Male","club_status":"platinum","comments":"Monitored systematic software"}
{"ROWTIME":1533154567099,"ROWKEY":"{\"id\":\"5b6213ba78aae5dc09ef8b60\"}","id":"5b6213ba78aae5dc09ef8b60","first_name":"Cammy","last_name":"Suche","email":"[email protected]","gender":"Male","club_status":"gold","comments":"Inverse client-server alliance"}

Шаг 4. Создайте поток из очереди тем

   CREATE STREAM customers_profile \
    (id integer, first_name string, last_name string, \
     email string, gender string, club_status string, \
    comments string) WITH \ 
    (KAFKA_TOPIC='jsondb.jsondb.customerprofile',VALUE_FORMAT='json'); 

Шаг 5. Здесь поток получает нули (0) от CUSTOMER_REPART

ksql> CREATE STREAM customers_stream WITH 
 (KAFKA_TOPIC='CUSTOMERS_REPART',VALUE_FORMAT='json',PARTITIONS=1) as SELECT * 
 FROM customers_profile PARTITION BY id;   

Шаг 6: Пожалуйста, проверьте вывод этой команды печати, где он показывает нули в значениях столбца ROWKEY и id. Откуда они взялись? Что мне нужно сделать, чтобы убедиться, что столбцы ключей ROWKEY и id показывают правильные значения из необработанных значений файла json, которые были изначально загружены в mongodb?

ksql> SET 'auto.offset.reset' = 'earliest';
        Successfully changed local property 'auto.offset.reset' from 'null' to 
        'earliest'
        ksql> print 'CUSTOMERS_REPART' from beginning;
        Format:JSON

{"ROWTIME": 1533154567091, "ROWKEY": "0", "CLUB_STATUS": "платина", "GENDER": "Женский", "КОММЕНТАРИИ": "Расширенная интерактивная инициатива", "ID": 0, "LAST_NAME" : "Stubbe", "EMAIL": "dstubbe0 @ {" ROWTIME ": 1533154567096," ROWKEY ":" 0 "," CLUB_STATUS ":" silver "," GENDER ":" Female "," COMMENTS ":" Оптимизированный нейтральный стандартизация "," ID ": 0," LAST_NAME ":" Salt "," EMAIL ":" lsalt1 @ appl {"ROWTIME": 1533154567096, "ROWKEY": "0", "CLUB_STATUS": "platinum", "GENDER" ":" Мужской "," КОММЕНТАРИИ ":" Деинженерия системной лояльности клиентов "," ID ": 0," LAST_NAME ":" Плевать "," EMAIL ":" bs ":" Benjamen "} {" ROWTIME ": 1533154567097, "ROWKEY": "0", "CLUB_STATUS": "gold", "GENDER": "Female", "COMMENTS": "Определение фазового гибрида", "ID": 0, "LAST_NAME": "Pogosian", "EMAIL": "hpogosian3 @ pagespe {" ROWTIME ": 1533154567097," ROWKEY ":" 0 "," CLUB_STATUS ":" platinum "," GENDER ":" Male "," COMMENTS ":" Контролируемое систематическое ПО "," ID ": 0," LAST_NAME ":" Karolewski "," EMAIL ":" mkarolews {"ROWTIME": 1533154567099, "ROWKEY": "0", "CLUB_STATUS": "gold", "GENDER": "Male", «КОММЕНТАРИИ»: «Обратный клиент- server alliance "," ID ": 0," LAST_NAME ":" Suche "," EMAIL ":" [email protected] {"ROWTIME": 1533154567104, "ROWKEY": "0", "CLUB_STATUS": "платина" , "GENDER": "Male", "COMMENTS": "Поэтапное унифицированное программное обеспечение", "ID": 0, "LAST_NAME": "Suttling", "EMAIL": "msuttling7 @ baidu. {"ROWTIME": 1533154567104, "ROWKEY": "0", "CLUB_STATUS": "gold", "GENDER": "Female", "COMMENTS": "Глобальное программное обеспечение, ориентированное на качество", "ID": 0, " LAST_NAME ":" Stepney "," EMAIL ":" estepney8 @ we


person Aloha Akbar    schedule 02.08.2018    source источник


Ответы (1)


Что касается вашего первого вопроса, я думаю, вы видите эффект https://github.com/debezium/debezium/blob/master/debezium-connector-mongodb/src/main/java/io/debezium/коннектор/mongodb/transforms/UnwrapFromMongoDbEnvelope.java#L163

Что касается вашего второго вопроса, я думаю, что проблема в id integer, поскольку идентификаторы теперь содержат строки.

person Jiri Pechanec    schedule 13.08.2018