У меня есть необработанные потоки из 3 таблиц mysql, 1 основной и двух дочерних таблиц. Я попытался объединить три сырых потока и преобразовал их в один выходной поток. Он работает, если есть какие-либо обновления в родительском потоке, но не запускает вывод, если что-либо изменяется в дочернем потоке.
@StreamListener
public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
@Input KStream<Long, Child1> child1Stream,
@Input KStream<Long, Child2> child2Stream) {
KTable<Long, Parent> parentTable = convertParent(parentStream);
KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);
parentTable
.leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
.leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
.toStream()
}
Любое новое добавление или обновление в родительском потоке обрабатывается процессором, объединяет его с другим KTable и возвращает его в выходной поток. Но любое добавление или обновление в child1stream или child2stream не запускает выходной поток.
Я думал, что все входные потоки будут отображаться как KTable, они всегда будут хранить изменения, поскольку все они имеют один и тот же ключ, и любое обновление родительских или дочерних таблиц будет подбираться объединениями. Но этого не происходит, может ли кто-нибудь подсказать, чего мне здесь не хватает?
Я уже пробовал объединения KStream-KStream, Stream-KTable, KTable-KTable, ни один из них не работал в случае дочерних обновлений.
-Спасибо