Группа Kafka Streams API По поведению

Я новичок в потоках kafka, и я пытаюсь объединить некоторые потоковые данные в KTable с помощью функции groupBy. Проблема в следующем:

Созданное сообщение представляет собой сообщение json в следующем формате:

{ "current_ts": "2019-12-24 13:16:40.316952",
  "primary_keys": ["ID"],
  "before": null,
  "tokens": {"txid":"3.17.2493", 
             "csn":"64913009"},
  "op_type":"I",
  "after":  { "CODE":"AAAA41",
              "STATUS":"COMPLETED",
              "ID":24},
  "op_ts":"2019-12-24 13:16:40.316941",
  "table":"S_ORDER"} 

Я хочу изолировать поле json "после", а затем создать KTable с "ключом" = "ID" и значением всего json < strong> "после".

Во-первых, я создал KStream, чтобы изолировать "после" json, и он отлично работает.

Блок кода KStream: (Не обращайте внимания на оператор if, потому что «до» и «после» имеют одинаковый формат.)

KStream<String, String> s_order_list = s_order
                .mapValues(value -> {
                    String time;
                    JSONObject json = new JSONObject(value);
                    if (json.getString("op_type").equals("I")) {
                        time = "after";
                    }else {
                        time = "before";
                    }
                    JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
                    return json2.toString();
                });

Результат, как и ожидалось, следующий:

...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...

После этого я реализую KTable для группировки по «ID» json.

Кодовый блок KTable:

  KTable<String, String> s_table = s_order_list
                .groupBy((key, value) -> {
                    JSONObject json = new JSONObject(value);
                    return json.getString("ID");
                });

И есть ошибка, которую я хочу создать KTable<String, String>, но создаю GroupedStream<Object,String>.

Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>

В заключение, вопрос в том, что такое KGroupedStreams и как правильно реализовать KTable?


person ChrisGav    schedule 31.12.2019    source источник
comment
Почему бы вам просто не начать с того, чтобы указать идентификатор в качестве ключа производителя? (вы используете Дебезиум?)   -  person OneCricketeer    schedule 31.12.2019
comment
Как вы создаете свои апстрим-объекты, т. Е. s_order - кажется, это просто общая проблема.   -  person Matthias J. Sax    schedule 01.01.2020
comment
KStream нельзя преобразовать в KTable. Так почему бы не разделить данные совместно с помощью самой KTable.   -  person Fatema Khuzaima Sagar    schedule 03.01.2020
comment
@ cricket_007 Нет, я не использую Debezium. В проекте, над которым я работаю, я не могу изменять сообщения, публикуемые продюсером.   -  person ChrisGav    schedule 07.01.2020
comment
@ MatthiasJ.Sax s_order - еще один поток, созданный с помощью builder.stream("topic-name"). На самом деле это избыточно, это может быть просто s_order_list, читающий сообщения из темы напрямую.   -  person ChrisGav    schedule 07.01.2020
comment
Вы указали типы исходного потока? Может быть, вам нужно так builder.<keyType, ValueType>("topic-name"); сказать компилятору, чтобы он исправлял типы?   -  person Matthias J. Sax    schedule 08.01.2020


Ответы (1)


После процессора groupBy вы можете использовать процессор с отслеживанием состояния, например aggregate или reduce (который возвращает KTable). Вы можете сделать что-то вроде этого:

KGroupedStream<String, String> s_table = s_order_list
                     .groupBy((key, value) ->
                         new JSONObject(value).getString("ID"),
                         Grouped.with(
                                 Serdes.String(),
                                 Serdes.String())
                     );

KTable<String, StringAggregate> aggregateStrings = s_table.aggregate(
                     (StringAggregate::new),
                     (key, value, aggregate) -> aggregate.addElement(value));

StringAggregate выглядит так:

public class StringAggregate {

    private static List<String> elements = new ArrayList<>();

    public StringAggregate addElement(String element){
        elements.add(element);
        return this;
    }
    //other methods
}
person Javier Gonzalez Benito    schedule 31.12.2019