KStream несовместимые типы

Я пробовал что-то с частью кодирования, используя строки, и придумал код ниже, пожалуйста, помогите с проблемами синтаксиса с кодом.

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
    .foreach(new ForeachAction<String, String>(){
        public void apply(String key, String value){
            String str=value.replace("[","");
            for(int y=0; y<str.length(); y++){
                //System.out.print(str.charAt(y));
            }
            //System.out.print("\n");
        }
    });
mstream.to("OUTTOPIC");

Во втором выражении KStream он показывает несовместимые типы..... Я хочу, чтобы логика замены работала, и она должна быть затронута переменной textlines. Заранее спасибо.


person Sachin Kademane    schedule 05.12.2017    source источник


Ответы (1)


Результатом foreach является void, а не следующее KStream. Если вы хотите преобразовать данные, вам нужно вместо этого использовать map...

Код будет выглядеть следующим образом (особенно если нужно еще и ключ поменять, если нет - см. следующий вариант):

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("INTOPIC");
KStream<String, String> mstream = textlines
    .map((key, value) -> new KeyValue<>(key, value.replace("[",""))));
mstream.to("OUTTOPIC");

или вы можете заменить строки 3-4 следующими (это приведет к лучшему выполнению):

KStream<String, String> mstream = textlines
    .mapValues(value -> value.replace("[",""));
person Alex Ott    schedule 05.12.2017
comment
Можете ли вы просто отправить мне отредактированный код выше.. я не понял правильно - person Sachin Kademane; 05.12.2017
comment
Я добавил код, чтобы показать его, но рекомендую, прежде чем переходить к кодированию, прочитать документацию (docs.confluent.io/current/streams/index.html) — Kafka Streams концептуально прост, но лучше сначала изучить основы - person Alex Ott; 05.12.2017
comment
Большое спасибо ... это сработало для меня ... и еще один вопрос: как я могу удалить нуль, который отображается как ключ в KStream «ключ, значение» - person Sachin Kademane; 05.12.2017
comment
Сообщения Kafka всегда имеют ключ, который вы можете просто игнорировать, если он не нужен. - person Alex Ott; 05.12.2017
comment
Я имел в виду логически игнорировать - для вывода данных в Kafka вам нужно иметь ключ, если вы его не укажете - то он указан как NULL - person Alex Ott; 05.12.2017
comment
Да, но отображается нуль, которого не должно быть... как избежать отображения нуля? - person Sachin Kademane; 05.12.2017
comment
Как вы отображаете данные? консоль-потребитель или что-то еще? - person Alex Ott; 05.12.2017
comment
Не помню, что из коробки можно... А может и забыл - person Alex Ott; 05.12.2017
comment
Просто продолжение: всегда лучше использовать mapValues(), если вам не нужен ключ - это, скорее всего, даст вам лучшее выполнение. - person Matthias J. Sax; 06.12.2017
comment
@MatthiasJ.Sax полностью согласен - я использовал обычную карту только для того, чтобы показать прямое сопоставление между вводом и выводом - person Alex Ott; 06.12.2017
comment
Не могли бы вы обновить свой ответ, чтобы указать на это? Не уверен, что люди читают комментарии :) - person Matthias J. Sax; 06.12.2017
comment
В команде потребителя консоли kafka простое создание key.print = false будет отображать только значение... - person Sachin Kademane; 06.12.2017
comment
@MatthiasJ.Sax Просто удалив --property print.key=true, это в моей команде kafka-cosole-consumer работает, чтобы не отображать null на консоли .... но дело в том, что это не только для целей отображения, которые я использую. ... мои выходные данные из потоков kafka должны направляться непосредственно в потоковый реактор, потоковый реактор не может принимать значение null, а затем значение ... так что же может быть решением для этого? - person Sachin Kademane; 07.12.2017
comment
Просто добавьте .filter() перед to() и отфильтруйте записи с null. - person Matthias J. Sax; 07.12.2017