Очистка государственного хранилища приложения

Я хочу очистить хранилище состояний конкретного экземпляра внутри приложения потока kafka. Например, предположим, что если я поддерживаю список 5 максимальных значений в состоянии, я хочу очищать его ежечасно. Есть ли способ сделать это - не останавливая приложение?


person MustakMu    schedule 15.12.2017    source источник


Ответы (1)


Кажется возможным.

Например, вы можете зарегистрировать Punctuation расписание, которое запускается один раз в зависимости от времени настенных часов, которое удаляет все записи из магазина.

https://docs.confluent.io/current/streams/developer-guide/processor-api.html#defining-a-stream-processor.

person Matthias J. Sax    schedule 15.12.2017
comment
Я .. Это должно решить мою проблему .. Я постараюсь реализовать это и вернусь к вам ... Спасибо - person MustakMu; 16.12.2017
comment
Я использую агрегатную функцию с хранилищем состояний .. Есть ли способ очистить хранилище состояний агрегатной функции? .. - person MustakMu; 16.12.2017
comment
Для DSL это немного сложнее, но не невозможно. Вам нужно добавить пользовательский Processor через process() и прикрепить внутреннее хранилище aggregation() к этому пользовательскому процессору, чтобы он мог получить к нему доступ. Затем вы можете прописать знаки препинания в своем пользовательском Processor. Вы можете получить всю необходимую информацию через Topology#describe(). Должен признать, это своего рода взлом ... - person Matthias J. Sax; 16.12.2017
comment
Хорошо .. Я попробую реализовать это ... Разве мы не должны рассматривать очистку хранилища состояний в потоковой обработке как одну из функций в следующем выпуске? Просто мысль.. - person MustakMu; 18.12.2017
comment
Некоторые люди отправляют запрос на TTL, но и TTL, и очистка магазина сомнительны на уровне DSL. А для Processor API это можно реализовать. Таким образом, нет конкретной дорожной карты по добавлению этого банкомата. - person Matthias J. Sax; 18.12.2017