Тестирование приложений KafkaStreams

Я установил простую агрегацию, усредняющую значения из нескольких потоков вместе, и пытаюсь ее протестировать. Я прожигал много времени и, кажется, не могу понять концепции прямо в моей голове. Мой поток ниже:

// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream = 
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));

// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key, 
value) -> value.getK())
            .groupByKey(...)
            .aggregate(AvgTick::new,
                    (key, value, aggregate) -> {
                        aggregate.addTick(value);
                        return aggregate;
                    },
                    Materialized.with(...))
            .toStream();

indexTickerStream.to(sinkTopic, Produced.with(...));

Мой тест использует EmbeddedKafka, отправляет кучу записей в темы и находится в заблокированной очереди, ожидая прибытия записей в sinkTopic.

Мне интересно, как эта агрегация меняется с течением времени, поэтому я хочу утверждать это среднее значение для каждого выходного тикера. Я могу добавить некоторый уровень окон, но пока я постарался сделать его простым.

Когда я запускаю свой тест, я получаю разные результаты. Предположим, у меня есть 10 входных записей в мою топологию:

  • Мой агрегатор звонили 10 раз
  • Точка останова, которую я помещаю в свой AverageTick сериализатор, вызывается разное количество раз.
  • Я утверждаю значения записей в своих тестах.

Я думаю, это связано с функциональностью кеша, определенной в KIP-63 - записи очень быстро появляются на узле обработки и объединяются / перезаписываются последней записью. (Хотя я не совсем уверен.)

У меня есть модульные тесты, проходящие с ProcessorTopologyTestDriver, но я пытаюсь написать некоторые приемочные тесты для службы, которая поддерживает эту логику.

Я также пробовал поиграть с моей конфигурацией commit.interval.ms, а также ставить паузы между публикациями моей входной записи с разной степенью (нестабильного) успеха.

  • Имеют ли вообще смысл подобные тесты?
  • Как я могу подтвердить правильность этого микросервиса на реальном экземпляре Kafka?

Я чувствую, что делаю здесь что-то концептуально неправильно - я просто не знаю, какой другой подход выбрать.


person jaker    schedule 03.02.2018    source источник


Ответы (1)


Ваше наблюдение верно. Кэширование усложняет тестирование, поскольку вводит недетерминизм.

Чтобы написать полезный тест, у вас есть два варианта:

  • отключить кеширование, установив размер кеша равным нулю (таким образом, все выходные записи, включая все промежуточные, будут детерминированными)
  • проверять только последнюю запись результата для каждого ключа (этот последний результат всегда должен быть одинаковым, независимо от кеширования для фиксированных входных данных)

Кстати: в грядущей версии 1.1 Kafka добавит общедоступный тестовый пакет, и мы планируем добавить больше: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams

person Matthias J. Sax    schedule 03.02.2018
comment
Я буду следить за этим КИП. Еще один способ, который я рассмотрел, - это отправить входную запись и дождаться получения выходной записи перед отправкой другой. - person jaker; 04.02.2018
comment
Это тоже должно сработать. Но это то же самое, что отключить кеш. - person Matthias J. Sax; 04.02.2018