Исправление агрегированного представления в потоковых данных

Этот вопрос связан с агрегированными представлениями KSQL или технологией потоковой обработки. Когда мы получаем события, мы применяем групповое предложение для их агрегирования. Теперь наступает событие, которое является исправлением некоторого предыдущего события. Это оставит мое агрегированное представление в несогласованном состоянии. Это не случай прибытия события вне очереди. Например. У меня есть событие (e), которое состоит из идентификатора объекта (t), категории (c) и атрибута количества (q). Следующие события

1) e1 —> t1, c1, q1
2) e2 -> t2, c2, q2
3) e3 -> t3, c1, q3
4) e4 -> t1, c1, q4 correction to e1
5) e5 -> t5, c2, q5

Мое агрегированное представление будет суммировать количество групп по категориям

c1 -> q1 + q3 + q4

c2 -> q2 + q5.

c1 сейчас в несовместимом состоянии. c1 должно быть только q3 + q4.

Есть ли способы решить такие проблемы. Я знаю, что могу сохранить все события в некотором кеше, а затем создать агрегированное представление, но это данные в реальном времени, поэтому все мои представления необходимо обновлять каждый раз.


person JManish    schedule 19.03.2019    source источник
comment
Ну, а как отличить q4 от поправки на q1 или q3? Есть ли какой-то конкретный атрибут, с которым вы сравниваете?   -  person Nishu Tayal    schedule 20.03.2019
comment
Идентификатор базового объекта такой же в случае e1 и e4. Итак, e4 - это последнее и исправленное обновление этого идентификатора объекта.   -  person JManish    schedule 20.03.2019


Ответы (1)


В Kafka Streams вы можете сделать KStream#groupBy()#aggregate()#mapValue(). Агрегат () не будет вычислять агрегацию, но вернет карту значения id->. В mapValue () вы вычисляете агрегирование по всем значениям карты. Таким образом, при поступлении обновлений if заменит старое значение новым значением в Map, а mapValue () правильно пересчитает результат агрегации.

person Matthias J. Sax    schedule 21.03.2019
comment
Не могли бы вы подробнее рассказать об этом. Вы имеете в виду пример вычитателя Stream DSL, где в категории Алиса изменяется с E на A. По моему требованию количество изменяется. Я относительно новичок в этом, можете ли вы указать на пример кода, который я могу посмотреть. Благодарность - person JManish; 21.03.2019
comment
Это немного отличается от упомянутого вами случая, потому что здесь мы используем KStream в качестве ввода, а пример Алисы основан на вводе KTable. Возможно, вы могли бы использовать входную KTable (не уверен). Я не знаю ни одного примера кода для этого. :( - person Matthias J. Sax; 21.03.2019
comment
Я загрузил свои данные в KTable как keyValue (t1, (c1, q1)), (t2, (c2, q2)). Создан новый поток со значением ключа (c1, q1), (c2, q2). Применяли группировку по ключу к категории, а затем использовали агрегатную функцию. Но функция агрегирования выдает агрегированное значение по категории, как и при обновлении базовой KTable. Мой поток KTable # KStream # groupby () # aggregate (). Он возвращает мне ключ как категорию и сумму всех количеств как ценность. Я что-нибудь пропустил, когда вы сказали, что агрегирование не вычисляется. - person JManish; 24.03.2019
comment
В чем ваш ключ и ценность? ключ = t1? В твоей базовой таблице? Кроме того, вы можете сделать KTable#groupBy без предварительного преобразования в поток (если вы сначала выполните согласование с потоком, вы потеряете поведение обновления). - person Matthias J. Sax; 25.03.2019