Kafka Streams не запускает вывод для объединенных потоков?

У меня есть необработанные потоки из 3 таблиц mysql, 1 основной и двух дочерних таблиц. Я попытался объединить три сырых потока и преобразовал их в один выходной поток. Он работает, если есть какие-либо обновления в родительском потоке, но не запускает вывод, если что-либо изменяется в дочернем потоке.

    @StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

    KTable<Long, Parent> parentTable = convertParent(parentStream);
    KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
    KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

    parentTable
           .leftJoin(child1Table, (parent, child1List) -> new Output(k, v))
           .leftJoin(child2Table, (output, child2List) -> output.setChild2List(child2List))
           .toStream()
        }

Любое новое добавление или обновление в родительском потоке обрабатывается процессором, объединяет его с другим KTable и возвращает его в выходной поток. Но любое добавление или обновление в child1stream или child2stream не запускает выходной поток.

Я думал, что все входные потоки будут отображаться как KTable, они всегда будут хранить изменения, поскольку все они имеют один и тот же ключ, и любое обновление родительских или дочерних таблиц будет подбираться объединениями. Но этого не происходит, может ли кто-нибудь подсказать, чего мне здесь не хватает?

Я уже пробовал объединения KStream-KStream, Stream-KTable, KTable-KTable, ни один из них не работал в случае дочерних обновлений.

-Спасибо


person R K    schedule 27.04.2018    source источник


Ответы (2)


Можете ли вы показать, где у вас EnableBinding и интерфейс процессора, к которому вы привязываетесь?

Мне это не кажется правильным:

@StreamListener
    public Stream<Long, Output> handleStreams(@Input KStream<Long, Parent> parentStream,
    @Input KStream<Long, Child1> child1Stream,
    @Input KStream<Long, Child2> child2Stream) {

Вы не указываете привязку для входов. Если у вас несколько входов, вам понадобится что-то вроде этого:

@StreamListener
        public Stream<Long, Output> handleStreams(@Input("input1") KStream<Long, Parent> parentStream,
        @Input("input2") KStream<Long, Child1> child1Stream,
        @Input("input3") KStream<Long, Child2> child2Stream) {

Каждый из этих входов должен быть определен в интерфейсе процессора. См. Пример здесь: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/kafka-streams-samples/kafka-streams-table-join/src/main/java/kafka/streams/table/join/KafkaStreamsTableJoin.java#L46

person sobychacko    schedule 27.04.2018
comment
Извините, это всего лишь псевдопредставление проблемы. У меня такой же код, и я следую тому же примеру. Оно работает. Мой вопрос больше о концепции. Любое обновление или вставка в основной поток работает, но если какое-либо изменение только в дочернем потоке не запускает выходной объект. - person R K; 27.04.2018
comment
Как вы получаете необработанные потоки из mysql? Дочерние потоки получают свои данные из тем kafka. Вы уверены, что сообщения в эти темы kafka приходят из mysql? - person sobychacko; 27.04.2018
comment
да, я использую debezium с kafka connect. Я вижу новые сообщения в разделах совокупного хранилища для дочернего потока, но они не отображаются на выходе. - person R K; 27.04.2018
comment
Все еще немного неясно. что вы подразумеваете под выводом в предыдущем комментарии? Вы видите сообщения, передаваемые в child1Stream и child2stream? - person sobychacko; 27.04.2018
comment
да, сообщения передаются в потоки child1stream и child2stream, а также передаются в совокупные хранилища. Поскольку сообщения находятся в совокупных хранилищах (поскольку они являются KTable), я ожидаю, что он будет выполнять соединения, как показано в приведенном выше коде, и должен генерировать поток выходного объекта (который состоит из объектов primary, child1 и child2). - person R K; 27.04.2018
comment
Ok. поскольку вы получаете сообщения, я думаю, что на этом этапе spring-cloud-stream не имеет значения. Он все координирует и сходит с критического пути. Может быть, кто-то из kafka-streams сможет вмешаться? Если вы поделитесь проектом, в котором мы можем воспроизвести проблему, я, возможно, смогу это рассмотреть. - person sobychacko; 27.04.2018

Обратите внимание, как ваши дочерние таблицы создаются из того же потока, что и parentTable:

KTable<Long, ArrayList<Child1>> child1Table = convertChild1(parentStream);
KTable<Long, ArrayList<Child2>> child2Table = convertChild2(parentStream);

Не уверен, что делают методы convertChild1 и convertChild2, но не следует ли им давать в качестве аргумента child1Stream и child2Stream соответственно?

person Michal Borowiecki    schedule 27.04.2018
comment
Простите, вы правы. Это была опечатка при написании вопроса. Также метод преобразования просто группирует по ключу и агрегат. Дочерний поток может иметь несколько записей для внешнего ключа. - person R K; 27.04.2018