Использование моего собственного драйвера Cassandra для записи результатов агрегации

Я пытаюсь создать простое приложение, которое записывает в Cassandra просмотры каждой веб-страницы на моем сайте. Я хочу писать каждые 5 минут накопительные просмотры страниц с начала логического часа.

Мой код для этого выглядит примерно так:

KTable<Windowed<String>, Long> hourlyPageViewsCounts = keyedPageViews
            .groupByKey()
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)), "HourlyPageViewsAgg")

Где я также установил интервал фиксации на 5 минут, установив свойство COMMIT_INTERVAL_MS_CONFIG. Насколько я понимаю, это должно агрегироваться в течение полного часа и выводить промежуточное состояние накопления каждые 5 минут.

Теперь у меня два вопроса:

  1. Учитывая, что у меня есть собственный драйвер Cassandra, как мне записать 5-минутные промежуточные результаты агрегации в Cassandra? Пытался использовать foreach, но это не работает.

  2. Мне нужна запись только после 5 минут агрегации, а не при каждом обновлении. Является ли это возможным? Читать здесь предполагает, что это возможно без использования низкоуровневого API, которого я стараюсь избегать, поскольку это кажется достаточно простой задачей, которую можно выполнить с помощью API более высокого уровня.


person idoda    schedule 07.09.2017    source источник


Ответы (1)


Фиксация и создание/запись вывода — это две разные концепции в Kafka Streams API. В API Kafka Streams выходные данные создаются непрерывно, а фиксации используются для «отметки прогресса» (т. е. для фиксации смещений потребителей, включая сброс всех хранилищ и буферизованных записей производителя).

Вы можете прочитать этот пост в блоге для получения более подробной информации: https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

1) Для записи в Casandra рекомендуется записать результат вашего приложения обратно в тему (через #to("topic-name")) и использовать Kafka Connect для передачи данных в Casandra.

Сравните: запросы внешней системы во время обработки Kafka Stream

2) Использование низкоуровневого API - единственный способ (как вы уже указали), если вы хотите иметь строгие 5-минутные интервалы. Обратите внимание, что следующий выпуск (Kafka 1.0) будет включать в себя пунктуацию настенных часов, что должно облегчить вам достижение вашей цели.

person Matthias J. Sax    schedule 08.09.2017
comment
Привет, спасибо за комментарий. Я бы использовал Kafka connect, но мне нужен некоторый мониторинг (счетчики, таймеры), и, насколько я понимаю, для этого мне нужно написать свой собственный коннектор. - person idoda; 10.09.2017