Идеальный способ дополнить KStream поисковыми данными

В моем потоке есть столбец под названием «категория», и у меня есть дополнительные статические метаданные для каждой «категории» в другом магазине, они обновляются раз в пару дней. Как правильно выполнять этот поиск? Есть два варианта с потоками Kafka

  1. Загрузите статические данные вне Kafka Streams и просто используйте KStreams#map() для добавления метаданных. Это возможно, поскольку Kafka Streams - это просто библиотека.

  2. Загрузите метаданные в тему Kafka, загрузите их в KTable и выполните KStreams#leftJoin(), это кажется более естественным и оставляет разделение и т. Д. На Kafka Streams. Однако для этого необходимо, чтобы KTable был загружен всеми значениями. Обратите внимание, что нам нужно будет загрузить все данные поиска, а не только изменения.

    • For example, say initially there was just one category 'c1'. Kafka streams app was stopped gracefully, and restarted again. After the restart, a new category 'c2' was added. My assumption is that, table = KStreamBuilder().table('metadataTopic') would just have the value 'c2', as that was the only thing that changed since the app started for second time. I would want it to have 'c1' and 'c2'.
    • Если у него также есть 'c1', будут ли данные когда-либо удалены из KTable (возможно, установив отправку сообщения key = null?)?

Что из вышеперечисленного является правильным способом поиска метаданных?

Можно ли при перезапуске всегда принудительно читать только один поток с самого начала, чтобы все метаданные можно было загрузить в KTable.

Есть ли другой способ использования магазинов?


person Vignesh Chandramohan    schedule 08.12.2016    source источник
comment
Если один из ответов ниже относится к вашему вопросу, отметьте его как Принято.   -  person Michael G. Noll    schedule 15.01.2020


Ответы (3)


  1. Загрузите статические данные вне Kafka Streams и просто используйте KStreams # map () для добавления метаданных. Это возможно, поскольку Kafka Streams - это просто библиотека.

Это работает. Но обычно люди выбирают следующий вариант, который вы указали, потому что побочные данные для обогащения входного потока обычно не полностью статичны; скорее, он меняется, но несколько нечасто:

  1. Загрузите метаданные в тему Kafka, загрузите их в KTable и выполните KStreams # leftJoin (), это кажется более естественным и оставляет разделение и т. Д. На Kafka Streams. Однако для этого необходимо, чтобы в KTable были загружены все значения. Обратите внимание, что нам нужно будет загрузить все данные поиска, а не только изменения.

Это обычный подход, и я бы рекомендовал придерживаться его, если у вас нет особых причин не делать этого.

Однако для этого необходимо, чтобы в KTable были загружены все значения. Обратите внимание, что нам нужно будет загрузить все данные поиска, а не только изменения.

Думаю, вы также предпочитаете второй вариант, но вас беспокоит, эффективен он или нет.

Короткий ответ: да, KTable будет загружен со всеми (последними) значениями для каждого ключа. Таблица будет содержать все данные поиска, но имейте в виду, что KTable разбивается за кулисами: если, например, ваша входная тема (для таблицы) имеет 3 разделов, то вы можете запустить до 3 экземпляров вашего приложения. , каждый из которых получает 1 раздел таблицы (при условии, что данные равномерно распределены по разделам, тогда каждый раздел / общий ресурс таблицы будет содержать около 1/3 данных таблицы). Так что на практике это, скорее всего, «просто работает». Более подробно поделюсь ниже.

Глобальные KTables: в качестве альтернативы вы можете использовать глобальные KTables вместо (секционированного) варианта с нормальной таблицей. С глобальными таблицами каждый экземпляр вашего приложения имеет полную копию данных таблицы. Это делает глобальные таблицы очень полезными для сценариев присоединения, в том числе для обогащения KStream в соответствии с вашим вопросом.

Можно ли при перезапуске всегда принудительно читать только один поток с самого начала, чтобы все метаданные можно было загрузить в KTable.

Вам не о чем беспокоиться. Проще говоря, если нет доступной локальной «копии» таблицы, Streams API автоматически обеспечит полное чтение данных таблицы с нуля. Если доступна локальная копия, ваше приложение будет повторно использовать эту копию (и обновлять свою локальную копию всякий раз, когда новые данные доступны во входной теме таблицы).

Более подробный ответ с примерами

Представьте себе следующие входные данные (подумайте: поток журнала изменений) для вашего KTable, обратите внимание, как этот вход состоит из 6 сообщений:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22)

И вот различные состояния "логического" KTable, которые могут возникнуть в результате этого ввода, где каждое новое полученное входное сообщение (например, (alice, 1)) приведет к новому состоянию таблицы:

Key      Value
--------------
alice   |   1    // (alice, 1) received

 |
 V

Key      Value
--------------
alice   |   1
bob     |  40    // (bob, 40) received

 |
 V

Key      Value
--------------
alice   |   2    // (alice, 2) received
bob     |  40

 |
 V

Key      Value
--------------
alice   |   2
bob     |  40
charlie | 600    // (charlie, 600) received

 |
 V

Key      Value
--------------
alice   |   5    // (alice, 5) received
bob     |  40
charlie | 600

 |
 V

Key      Value
--------------
alice   |   5
bob     |  22    // (bob, 22) received
charlie | 600

Здесь вы можете увидеть, что, хотя входные данные могут иметь много-много сообщений (или "изменений", как вы сказали; здесь у нас 6), количество записей / строк в результирующем KTable (которое постоянно мутации, основанные на вновь полученных входных данных) - это количество уникальных ключей во входных данных (здесь: начиная с 1, увеличиваясь до 3), которое обычно значительно меньше количества сообщений. Итак, если количество сообщений во входных данных равно N, а количество уникальных ключей для этих сообщений равно M, то обычно M << N (M значительно меньше, чем N; плюс, для записи у нас есть инвариант M <= N).

Это первая причина, по которой «это требует, чтобы KTable оставался загруженным со всеми значениями», как правило, не является проблемой, потому что для каждого ключа сохраняется только последнее значение.

Вторая причина, которая помогает, заключается в том, что, как указал Маттиас Дж. Сакс, Kafka Streams использует RocksDB в качестве механизма хранения по умолчанию для таких таблиц (точнее: состояние хранит, что поддерживает таблицу). RocksDB позволяет вам поддерживать таблицы, размер которых превышает доступную основную память / пространство кучи Java вашего приложения, поскольку они могут быть перенесены на локальный диск.

Наконец, третья причина состоит в том, что KTable разбит на разделы. Итак, если ваша входная тема для таблицы (скажем) настроена с 3 разделами, то за кулисами происходит то, что KTable сам разбивается (думаю: сегментирован) таким же образом. В приведенном выше примере вот что вы можете получить, хотя точное «разбиение» зависит от того, как исходные входные данные распределяются по разделам входной темы таблицы:

Логическая таблица KTable (последнее состояние из того, что я показал выше):

Key      Value
--------------
alice   |   5
bob     |  22
charlie | 600

Фактическая KTable, секционированная (при условии, что 3 разделов для входной темы таблицы, плюс ключи = имена пользователей равномерно распределены по разделам):

Key      Value
--------------
alice   |   5    // Assuming that all data for `alice` is in partition 1

Key      Value
--------------
bob     |  22    // ...for `bob` is in partition 2

Key      Value
--------------
charlie | 600    // ...for `charlie` is in partition 3

На практике такое разбиение входных данных - среди прочего - позволяет вам «определять размер» фактических проявлений KTable.

Другой пример:

  • Представьте, что последнее состояние вашей KTable обычно имеет размер 1 ТБ (опять же, приблизительный размер является функцией количества уникальных ключей сообщений во входных данных таблицы, умноженных на средний размер соответствующего значения сообщения).
  • Если входная тема таблицы имеет только 1 раздел, то сама KTable также имеет только 1 раздел размером 1 ТБ. Здесь, поскольку тема ввода имеет только 1 раздел, вы можете запустить свое приложение с экземплярами приложения до 1 (так что параллелизма на самом деле не так уж много, хех).
  • Если входная тема таблицы имеет 500 разделов, то в KTable также есть 500 раздел, размером ~ 2 ГБ каждый (при условии, что данные равномерно распределены по разделам). Здесь вы можете запустить свое приложение, используя до 500 экземпляров приложения. Если бы вы запускали ровно 500 экземпляра, то каждый экземпляр приложения получил бы ровно 1 раздел / сегмент логической KTable, таким образом получив 2 ГБ данных таблицы; если бы вы запускали только 100 экземпляров, то каждый экземпляр получил бы 500 / 100 = 5 разделов / сегментов таблицы, что в итоге привело бы к примерно 2 GB * 5 = 10 GB табличным данным.
person Michael G. Noll    schedule 08.12.2016
comment
Что происходит, когда входной поток имеет несколько разделов, поток метаданных имеет только один раздел и существует несколько экземпляров приложения? Будет ли каждый экземпляр приложения загружать поток метаданных, или один из них загрузит его, а другие каким-то образом получат значение из этого экземпляра? - person Vignesh Chandramohan; 09.12.2016
comment
Большое спасибо за отличный ответ. В случае, если вы хотите присоединиться к этой KTable с потоком, как вы убедитесь, что раздел, который вы получаете из KTable, содержит данные, необходимые для присоединения к потоку? Вы должны убедиться, что у них один и тот же ключ? - person nsanglar; 13.08.2019
comment
Да, соединение требует, чтобы и поток, и таблица имели один и тот же ключ. - person Michael G. Noll; 13.08.2019
comment
Хорошо, это имеет смысл. Теперь, что произойдет, если события в потоке будут содержать несколько полей, каждое из которых требует поиска (или более) в другой KTable? Нам нужно будет выполнить соединение, изменить ключ потока и продолжить? По вашему мнению, будет ли такой подход работать разумно? Большое спасибо за понимание :) - person nsanglar; 13.08.2019
comment
Вы предлагаете один вариант. Другой вариант - прочитать различные таблицы в GlobalKTables, которые вы можете присоединить к своему потоку без необходимости иметь одинаковый ключ с обеих сторон. См. stackoverflow.com/questions/45975755/. - person Michael G. Noll; 14.08.2019

Ваше общее наблюдение верное, и все зависит от того, какие компромиссы для вас важнее. Если у вас мало метаданных, вариант 1 кажется лучшим. Если метаданные большие, вариант 2 - лучший вариант.

Если вы используете map(), вам необходимо иметь полную копию ваших метаданных в каждом экземпляре приложения (поскольку вы не можете точно знать, как Streams разделит ваши KStream данные). Таким образом, если ваши метаданные не помещаются в основную память, использование map() не будет работать легко.

Если вы используете KTable, Streams позаботится о том, чтобы метаданные были правильно сегментированы по всем запущенным экземплярам приложения, так что дублирование данных не требуется. Кроме того, KTable использует RocksDB в качестве механизма хранения состояний и, таким образом, может переноситься на диск.

НАЧАТЬ ИЗМЕНЕНИЕ

О том, что все данные находятся в KTable: если у вас есть две категории для одного и того же ключа, второе значение будет перезаписывать первое значение, если вы читаете данные прямо из темы в KTable через builder.table(...) (семантика журнала изменений). Однако вы можете легко обойти это, прочитав тему как поток записей (т. Е. builder.stream(...) и применив агрегирование для вычисления KTable. Ваша агрегация просто выдаст список всех значений для каждого ключа.

Об удалении: KTable использует семантику журнала изменений и понимает сообщения захоронения для удаления пар ключ-значение. Таким образом, если вы читаете KTable из темы, а тема содержит <key:null> сообщение, текущая запись в KTable с этим ключом будет удалена. Этого труднее достичь, когда KTable является результатом агрегирования, потому что входная запись агрегирования с ключом null или значением null просто игнорируется и не обновляет результат агрегирования.

Обходным путем было бы добавить шаг map() перед агрегированием и ввести значение NULL (т. Е. Определенный пользователем «объект», который представляет надгробие, но не null - в вашем случае вы могли бы назвать его null-category). При агрегировании вы просто возвращаете значение null как результат агрегирования, если входная запись имеет значение null-category. Затем это преобразуется в сообщение-надгробие для вашего KTable и удаляет текущий список категорий для этого ключа.

РЕДАКТИРОВАТЬ КОНЕЦ

И, конечно же, вы всегда можете создать собственное решение с помощью Processor API. Однако, если DSL может дать вам то, в чем вы нуждаетесь, для этого нет веских причин.

person Matthias J. Sax    schedule 08.12.2016
comment
Обновлен вопрос с примерами загрузки полных данных поиска. Возможно, я неправильно понял, что содержит KTable, буду ждать вашего обновления). - person Vignesh Chandramohan; 09.12.2016

Из Kafka 0.10.2.0, выпущенного в феврале 2017 года, концепция GlobalKTable, вероятно, является лучшим вариантом для обогащения потока данными поиска.

https://docs.confluent.io/current/streams/concepts.html#globalktable

person Val Bonn    schedule 16.02.2019