Я установил простую агрегацию, усредняющую значения из нескольких потоков вместе, и пытаюсь ее протестировать. Я прожигал много времени и, кажется, не могу понять концепции прямо в моей голове. Мой поток ниже:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
Мой тест использует EmbeddedKafka, отправляет кучу записей в темы и находится в заблокированной очереди, ожидая прибытия записей в sinkTopic
.
Мне интересно, как эта агрегация меняется с течением времени, поэтому я хочу утверждать это среднее значение для каждого выходного тикера. Я могу добавить некоторый уровень окон, но пока я постарался сделать его простым.
Когда я запускаю свой тест, я получаю разные результаты. Предположим, у меня есть 10 входных записей в мою топологию:
- Мой агрегатор звонили 10 раз
- Точка останова, которую я помещаю в свой
AverageTick
сериализатор, вызывается разное количество раз. - Я утверждаю значения записей в своих тестах.
Я думаю, это связано с функциональностью кеша, определенной в KIP-63 - записи очень быстро появляются на узле обработки и объединяются / перезаписываются последней записью. (Хотя я не совсем уверен.)
У меня есть модульные тесты, проходящие с ProcessorTopologyTestDriver
, но я пытаюсь написать некоторые приемочные тесты для службы, которая поддерживает эту логику.
Я также пробовал поиграть с моей конфигурацией commit.interval.ms
, а также ставить паузы между публикациями моей входной записи с разной степенью (нестабильного) успеха.
- Имеют ли вообще смысл подобные тесты?
- Как я могу подтвердить правильность этого микросервиса на реальном экземпляре Kafka?
Я чувствую, что делаю здесь что-то концептуально неправильно - я просто не знаю, какой другой подход выбрать.