Приложение для подсчета слов Kafka Streams

Я играю с API потоковой передачи kafka (версия Kakfa: 0.10.2.0), пытаясь заставить простой пример подсчета слов работать: суть приложения Wordcount. Я использую как производителя, так и консоль:

./kafka-console-producer.sh -topic input-topic --broker-list localhost:9092

./kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning

запустите приложение, и все вроде бы работает нормально, но когда я ввожу некоторые строки в производитель консоли, потребитель вообще ничего не получает. Если я изменю приложение на простой toUppercase на входе, потребитель получит поток (измененный на верхний регистр) в порядке:

//The following code works fine: val uppercasedWithMapValues: KStream[String, String] = textLines.mapValues(_.toUpperCase()) uppercasedWithMapValues.to("output-topic")

Кто-нибудь знает, почему я ничего не получаю на примере подсчета слов? Должен ли я указывать на потребителе какой-либо сериализатор? В моем последнем тесте потребитель консоли обрабатывал сообщения, которые я отправлял через консоль, но не отображал их, см. Вывод ниже:

➜  bin ./kafka-console-consumer.sh \
           --topic output-topic \
           --bootstrap-server localhost:9092 \
           --from-beginning                                                                                
[2017-08-02 07:48:20,187]WARN Error while fetching metadata with correlation id 2 : 
{output-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2017-08-02 07:48:20,197] WARN The following subscribed topics are not assigned 
to any members in the group console-consumer-91651 : [output-topic]  
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

^ CОбработано всего 7 сообщений.


person ardlema    schedule 02.08.2017    source источник


Ответы (1)


KStream работает, потому что не использует кеширование. Для KTable вам нужно немного подождать или установить cache.max.bytes.buffering на 0 (но не в производственном коде!)

person Arek    schedule 16.08.2017
comment
Потрясающий! Это сработало! Большое тебе спасибо! Думаю, мне нужно больше узнать о внутреннем устройстве потоков kafka. Еще раз спасибо @Arek - person ardlema; 16.08.2017
comment
Рад помочь вам @ardlema :) - person Arek; 17.08.2017