- Загрузите статические данные вне Kafka Streams и просто используйте KStreams # map () для добавления метаданных. Это возможно, поскольку Kafka Streams - это просто библиотека.
Это работает. Но обычно люди выбирают следующий вариант, который вы указали, потому что побочные данные для обогащения входного потока обычно не полностью статичны; скорее, он меняется, но несколько нечасто:
- Загрузите метаданные в тему Kafka, загрузите их в KTable и выполните KStreams # leftJoin (), это кажется более естественным и оставляет разделение и т. Д. На Kafka Streams. Однако для этого необходимо, чтобы в KTable были загружены все значения. Обратите внимание, что нам нужно будет загрузить все данные поиска, а не только изменения.
Это обычный подход, и я бы рекомендовал придерживаться его, если у вас нет особых причин не делать этого.
Однако для этого необходимо, чтобы в KTable были загружены все значения. Обратите внимание, что нам нужно будет загрузить все данные поиска, а не только изменения.
Думаю, вы также предпочитаете второй вариант, но вас беспокоит, эффективен он или нет.
Короткий ответ: да, KTable будет загружен со всеми (последними) значениями для каждого ключа. Таблица будет содержать все данные поиска, но имейте в виду, что KTable разбивается за кулисами: если, например, ваша входная тема (для таблицы) имеет 3
разделов, то вы можете запустить до 3
экземпляров вашего приложения. , каждый из которых получает 1
раздел таблицы (при условии, что данные равномерно распределены по разделам, тогда каждый раздел / общий ресурс таблицы будет содержать около 1/3 данных таблицы). Так что на практике это, скорее всего, «просто работает». Более подробно поделюсь ниже.
Глобальные KTables: в качестве альтернативы вы можете использовать глобальные KTables вместо (секционированного) варианта с нормальной таблицей. С глобальными таблицами каждый экземпляр вашего приложения имеет полную копию данных таблицы. Это делает глобальные таблицы очень полезными для сценариев присоединения, в том числе для обогащения KStream в соответствии с вашим вопросом.
Можно ли при перезапуске всегда принудительно читать только один поток с самого начала, чтобы все метаданные можно было загрузить в KTable.
Вам не о чем беспокоиться. Проще говоря, если нет доступной локальной «копии» таблицы, Streams API автоматически обеспечит полное чтение данных таблицы с нуля. Если доступна локальная копия, ваше приложение будет повторно использовать эту копию (и обновлять свою локальную копию всякий раз, когда новые данные доступны во входной теме таблицы).
Более подробный ответ с примерами
Представьте себе следующие входные данные (подумайте: поток журнала изменений) для вашего KTable
, обратите внимание, как этот вход состоит из 6
сообщений:
(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)
И вот различные состояния "логического" KTable
, которые могут возникнуть в результате этого ввода, где каждое новое полученное входное сообщение (например, (alice, 1)
) приведет к новому состоянию таблицы:
Key Value
--------------
alice | 1 // (alice, 1) received
|
V
Key Value
--------------
alice | 1
bob | 40 // (bob, 40) received
|
V
Key Value
--------------
alice | 2 // (alice, 2) received
bob | 40
|
V
Key Value
--------------
alice | 2
bob | 40
charlie | 600 // (charlie, 600) received
|
V
Key Value
--------------
alice | 5 // (alice, 5) received
bob | 40
charlie | 600
|
V
Key Value
--------------
alice | 5
bob | 22 // (bob, 22) received
charlie | 600
Здесь вы можете увидеть, что, хотя входные данные могут иметь много-много сообщений (или "изменений", как вы сказали; здесь у нас 6
), количество записей / строк в результирующем KTable
(которое постоянно мутации, основанные на вновь полученных входных данных) - это количество уникальных ключей во входных данных (здесь: начиная с 1
, увеличиваясь до 3
), которое обычно значительно меньше количества сообщений. Итак, если количество сообщений во входных данных равно N
, а количество уникальных ключей для этих сообщений равно M
, то обычно M << N
(M
значительно меньше, чем N
; плюс, для записи у нас есть инвариант M <= N
).
Это первая причина, по которой «это требует, чтобы KTable оставался загруженным со всеми значениями», как правило, не является проблемой, потому что для каждого ключа сохраняется только последнее значение.
Вторая причина, которая помогает, заключается в том, что, как указал Маттиас Дж. Сакс, Kafka Streams использует RocksDB в качестве механизма хранения по умолчанию для таких таблиц (точнее: состояние хранит, что поддерживает таблицу). RocksDB позволяет вам поддерживать таблицы, размер которых превышает доступную основную память / пространство кучи Java вашего приложения, поскольку они могут быть перенесены на локальный диск.
Наконец, третья причина состоит в том, что KTable
разбит на разделы. Итак, если ваша входная тема для таблицы (скажем) настроена с 3
разделами, то за кулисами происходит то, что KTable
сам разбивается (думаю: сегментирован) таким же образом. В приведенном выше примере вот что вы можете получить, хотя точное «разбиение» зависит от того, как исходные входные данные распределяются по разделам входной темы таблицы:
Логическая таблица KTable (последнее состояние из того, что я показал выше):
Key Value
--------------
alice | 5
bob | 22
charlie | 600
Фактическая KTable, секционированная (при условии, что 3
разделов для входной темы таблицы, плюс ключи = имена пользователей равномерно распределены по разделам):
Key Value
--------------
alice | 5 // Assuming that all data for `alice` is in partition 1
Key Value
--------------
bob | 22 // ...for `bob` is in partition 2
Key Value
--------------
charlie | 600 // ...for `charlie` is in partition 3
На практике такое разбиение входных данных - среди прочего - позволяет вам «определять размер» фактических проявлений KTable.
Другой пример:
- Представьте, что последнее состояние вашей KTable обычно имеет размер 1 ТБ (опять же, приблизительный размер является функцией количества уникальных ключей сообщений во входных данных таблицы, умноженных на средний размер соответствующего значения сообщения).
- Если входная тема таблицы имеет только
1
раздел, то сама KTable также имеет только 1
раздел размером 1 ТБ. Здесь, поскольку тема ввода имеет только 1
раздел, вы можете запустить свое приложение с экземплярами приложения до 1
(так что параллелизма на самом деле не так уж много, хех).
- Если входная тема таблицы имеет
500
разделов, то в KTable также есть 500
раздел, размером ~ 2 ГБ каждый (при условии, что данные равномерно распределены по разделам). Здесь вы можете запустить свое приложение, используя до 500
экземпляров приложения. Если бы вы запускали ровно 500
экземпляра, то каждый экземпляр приложения получил бы ровно 1
раздел / сегмент логической KTable, таким образом получив 2 ГБ данных таблицы; если бы вы запускали только 100
экземпляров, то каждый экземпляр получил бы 500 / 100 = 5
разделов / сегментов таблицы, что в итоге привело бы к примерно 2 GB * 5 = 10 GB
табличным данным.
person
Michael G. Noll
schedule
08.12.2016