Агрегация таймсерий Kafka Streams

Я использую Kafka Streams для обработки данных временных рядов. Одним из вариантов использования является почасовое агрегирование данных для каждого датчика (идентификатор датчика - это ключ сообщения в теме test).

Я написал конвейер, который группируется по ключу (ID датчика), а затем считает показания каждый час.

Проблема в том, что в теме test есть несколько повторяющихся сообщений (тот же идентификатор датчика и временная метка). Я хочу рассмотреть только последнее сообщение.

Есть ли что-нибудь в Streams DSL API для этого?

  meterDataStream
   .groupByKey()
   .count(
     TimeWindows
       .of(TimeUnit.HOURS.toMillis(1))
       .until(TimeUnit.HOURS.toMillis(1)), 
     "counts")
   .foreach((key, value) => {
     val start = epochMillistoDate(key.window().start())
     val end   = epochMillistoDate(key.window().end())
     logger.info(s"$start - $end\t->$value")
   })

person Sebastian    schedule 13.12.2017    source источник


Ответы (1)


Для этого вам нужно будет создать собственный оператор дедупликации.

meterDateStream
    .transform(/*write your own deduplicator*/)
    .groupByKey()....

Дедупликатор (т. Е. Transformer) должен иметь прикрепленное хранилище состояний, и вы можете проверить знаки препинания. Ознакомьтесь с документацией для получения более подробной информации:

person Matthias J. Sax    schedule 13.12.2017