Логика обновления GlobalKTable

Когда выполняются обновления основной темы GlobalKTable, какова логика для всех экземпляров KStream приложений для получения последних данных? Ниже приведены мои последующие вопросы:

  1. Будет ли GlobalKTable заблокирован на уровне записи или на уровне таблицы, когда происходят обновления?
  2. Согласно этому блогу: Проблема задержки Kafka GlobalKTable, может ли задержка увеличиться до 0,5 с ?! Если да, есть ли альтернатива для уменьшения задержки?
  3. Поскольку GlobalKTable по умолчанию использует RocksDB в качестве хранилища состояний, все ли функции RocksDB доступны для использования?

Я понимаю, что GlobalKTable не следует использовать в случаях, когда требуется частое обновление данных поиска. Есть ли какое-либо другое хранилище ключей и значений, которое мы можем использовать для случаев использования, которые могут потребовать обновления данных таблицы - например, Redis?

Я не смог найти много документации о GlobalKTable и его внутреннем устройстве. Доступна ли какая-либо документация?


person guru    schedule 28.05.2020    source источник


Ответы (2)


GlobalKTables - это асинхронные обновления. Следовательно, нет никакой гарантии, что разные экземпляры являются обновлениями.

Кроме того, «глобальный поток» использует выделенного «глобального потребителя», который можно настроить индивидуально для уменьшения задержки: https://docs.confluent.io/current/streams/developer-guide/config-streams.

RocksDB, интегрированный через JNI, и интерфейс JNI не раскрывают все возможности RocksDB. Более того, абстракции «таблицы» «скрывают» RocksDB, поэтому некоторые расширяются. Однако вы можете настроить RocksDB vie rocksdb.config.setter (https://docs.confluent.io/current/streams/developer-guide/config-streams.html#rocksdb-config-setter).

person Matthias J. Sax    schedule 03.06.2020
comment
Понятно. Если мы посмотрим на один экземпляр, является ли обновление асинхронным - в том смысле, что когда поток обрабатывается, будет ли он приостановить выполнение для обновления GlobalKTable или будет продолжаться как есть, и экземпляр получит последние данные асинхронно? - person guru; 04.06.2020
comment
Глобальное состояние / таблица обновляются с использованием выделенного потока, поэтому обработка обычной темы ввода происходит параллельно. - person Matthias J. Sax; 04.06.2020

Файл Документация Javadoc для KStream#join() довольно ясно показывает, что соединения с GlobalKTable происходят только при обработке записей в потоке. Поэтому, отвечая на ваш вопрос, нет никаких автоматических обновлений, которые происходят с лежащими в основе KStreams: новые сообщения должны быть обработаны в них, чтобы они могли видеть обновления.

«Соединение поиска по таблице» означает, что результаты вычисляются только в том случае, если обрабатываются записи KStream. Это выполняется путем поиска совпадающих записей в текущем внутреннем состоянии GlobalKTable. Напротив, обработка входных записей GlobalKTable обновляет только внутреннее состояние GlobalKTable и не создает никаких записей результатов.

  1. Если GlobalKTable материализован как хранилище значений ключей, большинство методов для перебора и изменения реализаций KeyValueStore используют ключевое слово synchronized, чтобы предотвратить вмешательство нескольких потоков, одновременно обновляющих хранилище состояний.

  2. Вы можете уменьшить задержку с помощью хранилища ключей и значений в памяти или с помощью реализации настраиваемого хранилища состояний.

  3. Взаимодействие с хранилищами состояний контролируется через набор интерфейсов в Kafka Streams, например KeyValueStore, поэтому в этом смысле вы не взаимодействуете напрямую с API RocksDB.

person ck1    schedule 31.05.2020
comment
Спасибо @ ck1. У меня есть несколько дополнительных вопросов. - person guru; 01.06.2020
comment
Вопросы по порядку: 1. ключевое слово synchronized находится во всей таблице или в конкретной паре "ключ-значение"? Не могли бы вы указать мне на Javadoc, если таковой имеется? 2. GlobalKTable сам по себе материализован как хранилище значений ключей в памяти, правильно ли я понимаю? Вы предполагаете, что GlobalKTable имеет проблемы с задержкой и что лучше иметь другое хранилище ключей и значений в памяти? 3. Я понимаю, что мы не можем взаимодействовать с RocksDB напрямую, но можем ли мы каким-то образом настроить RocksDB через API Kafka Streams? - person guru; 01.06.2020