Нужна информация о первичном ключе в соединителе Debezium для событий вставки postgres

Я использую коннектор Debezium для postgres с подключением Kafka.

Для события вставки строки, записываемого соединителем в Kafka, мне нужна информация о том, какие столбцы являются первичными ключами, а какие нет. Есть ли способ добиться этого?

Вставка образца события вставки, сгенерированного в Kafka:

  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "bucket_type"
          }
        ],
        "optional": true,
        "name": "postgresconfigdb.config.alert_configs.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "bucket_type"
          }
        ],
        "optional": true,
        "name": "postgresconfigdb.config.alert_configs.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "postgresconfigdb.config.alert_configs.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1100,
      "bucket_type": 10
    },
    "source": {
      "version": "1.2.0.Final",
      "connector": "postgresql",
      "name": "postgresconfigdb",
      "ts_ms": 1599830887858,
      "snapshot": "true",
      "db": "configdb",
      "schema": "config",
      "table": "alert_configs",
      "txId": 2139888,
      "lsn": 379356048,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1599830887859,
    "transaction": null
  }
}

Здесь столбцами в таблице являются «id» и «bucket_type», значения которых сообщаются в полезной нагрузке json-path-›after.

Существует информация о столбцах, которые не являются нулевыми, в необязательном логическом поле, специфичном для столбца, однако нет информации о том, какие столбцы являются первичными ключами. (идентификатор в данном случае)


person abhinav kumar    schedule 12.09.2020    source источник
comment
Это нормально для этого вручную? Вы можете использовать Kafka Connect SMT для извлечения поля PK в ключ сообщения, а затем использовать его на стороне потребителя.   -  person Iskuskov Alexander    schedule 12.09.2020


Ответы (1)


вы найдете информацию о том, какие поля являются столбцами PK в ключе Kafka.

person Jiri Pechanec    schedule 14.09.2020