Я использую Kafka Streams для обработки данных временных рядов. Одним из вариантов использования является почасовое агрегирование данных для каждого датчика (идентификатор датчика - это ключ сообщения в теме test
).
Я написал конвейер, который группируется по ключу (ID датчика), а затем считает показания каждый час.
Проблема в том, что в теме test
есть несколько повторяющихся сообщений (тот же идентификатор датчика и временная метка). Я хочу рассмотреть только последнее сообщение.
Есть ли что-нибудь в Streams DSL API для этого?
meterDataStream
.groupByKey()
.count(
TimeWindows
.of(TimeUnit.HOURS.toMillis(1))
.until(TimeUnit.HOURS.toMillis(1)),
"counts")
.foreach((key, value) => {
val start = epochMillistoDate(key.window().start())
val end = epochMillistoDate(key.window().end())
logger.info(s"$start - $end\t->$value")
})