Я новичок в потоках kafka, и я пытаюсь объединить некоторые потоковые данные в KTable с помощью функции groupBy. Проблема в следующем:
Созданное сообщение представляет собой сообщение json в следующем формате:
{ "current_ts": "2019-12-24 13:16:40.316952",
"primary_keys": ["ID"],
"before": null,
"tokens": {"txid":"3.17.2493",
"csn":"64913009"},
"op_type":"I",
"after": { "CODE":"AAAA41",
"STATUS":"COMPLETED",
"ID":24},
"op_ts":"2019-12-24 13:16:40.316941",
"table":"S_ORDER"}
Я хочу изолировать поле json "после", а затем создать KTable с "ключом" = "ID" и значением всего json < strong> "после".
Во-первых, я создал KStream, чтобы изолировать "после" json, и он отлично работает.
Блок кода KStream: (Не обращайте внимания на оператор if, потому что «до» и «после» имеют одинаковый формат.)
KStream<String, String> s_order_list = s_order
.mapValues(value -> {
String time;
JSONObject json = new JSONObject(value);
if (json.getString("op_type").equals("I")) {
time = "after";
}else {
time = "before";
}
JSONObject json2 = new JSONObject(json.getJSONObject(time).toString());
return json2.toString();
});
Результат, как и ожидалось, следующий:
...
null {"CODE":"AAAA48","STATUS":"SUBMITTED","ID":6}
null {"CODE":"AAAA16","STATUS":"COMPLETED","ID":1}
null {"CODE":"AAAA3","STATUS":"SUBMITTED","ID":25}
null {"CODE":"AAAA29","STATUS":"SUBMITTED","ID":23}
...
После этого я реализую KTable для группировки по «ID» json.
Кодовый блок KTable:
KTable<String, String> s_table = s_order_list
.groupBy((key, value) -> {
JSONObject json = new JSONObject(value);
return json.getString("ID");
});
И есть ошибка, которую я хочу создать KTable<String, String>
, но создаю GroupedStream<Object,String>
.
Required type: KTable<String,String>
Provided:KGroupedStream<Object,String>
no instance(s) of type variable(s) KR exist so that KGroupedStream<KR, String> conforms to KTable<String, String>
В заключение, вопрос в том, что такое KGroupedStreams и как правильно реализовать KTable?
s_order
- кажется, это просто общая проблема. - person Matthias J. Sax   schedule 01.01.2020builder.stream("topic-name")
. На самом деле это избыточно, это может быть просто s_order_list, читающий сообщения из темы напрямую. - person ChrisGav   schedule 07.01.2020builder.<keyType, ValueType>("topic-name");
сказать компилятору, чтобы он исправлял типы? - person Matthias J. Sax   schedule 08.01.2020