Я отправляю поток данных с виртуальной машины в тестовую тему Kafka (работает на хост-ОС с IP-адресом 192.168.0.12), используя приведенный ниже код.
public class WriteToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
.addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
properties.setProperty("group.id", "test");
DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));
env.execute();
}
JoinedStreamEvent
имеет тип DataSream<Tuple3<Integer,Integer,Integer>>
, он в основном объединяет 2 потока respirationRateStream
и heartRateStream
public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
Patient_id = patient_id;
HeartRate = heartRate;
RespirationRate = respirationRate;
Существует еще одна программа Flink, которая работает на хост-ОС и пытается прочитать поток данных из kafka. Я использую здесь localhost, так как kafka и zookeper работают на хост-ОС.
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));
/* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new , properties));*/
message.print();
env.execute();
} //main
} //ReadFromKafka
Я получаю вывод что-то вроде этого
Я думаю, мне нужно реализовать десериализатор типа JoinedStreamEvent
. Может кто-нибудь подсказать, как мне написать десериализатор для JoinedStreamEvent
типа DataSream<Tuple3<Integer, Integer, Integer>>
Пожалуйста, дайте мне знать, если нужно сделать что-то еще.
P.S. - Думал написать вслед за десериализатором, но не думаю, что это правильно
DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
TypeInformationSerializationSchema
также должен выполнять эту работу, но вам нужно передатьTypeInformation
конструктору:TypeInformation.of(JoinedStreamEvent.class)
- person twalthr   schedule 23.08.2017