Я пытаюсь создать простое приложение, которое записывает в Cassandra просмотры каждой веб-страницы на моем сайте. Я хочу писать каждые 5 минут накопительные просмотры страниц с начала логического часа.
Мой код для этого выглядит примерно так:
KTable<Windowed<String>, Long> hourlyPageViewsCounts = keyedPageViews
.groupByKey()
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)), "HourlyPageViewsAgg")
Где я также установил интервал фиксации на 5 минут, установив свойство COMMIT_INTERVAL_MS_CONFIG
. Насколько я понимаю, это должно агрегироваться в течение полного часа и выводить промежуточное состояние накопления каждые 5 минут.
Теперь у меня два вопроса:
Учитывая, что у меня есть собственный драйвер Cassandra, как мне записать 5-минутные промежуточные результаты агрегации в Cassandra? Пытался использовать foreach, но это не работает.
Мне нужна запись только после 5 минут агрегации, а не при каждом обновлении. Является ли это возможным? Читать здесь предполагает, что это возможно без использования низкоуровневого API, которого я стараюсь избегать, поскольку это кажется достаточно простой задачей, которую можно выполнить с помощью API более высокого уровня.