отправка DataStream из сокета виртуальной машины в Kafka и получение в программе Flink хост-ОС: проблема десериализации

Я отправляю поток данных с виртуальной машины в тестовую тему Kafka (работает на хост-ОС с IP-адресом 192.168.0.12), используя приведенный ниже код.

public class WriteToKafka {

    public  static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
                .addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
        properties.setProperty("group.id", "test");

        DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
        stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));

        env.execute();
    }

JoinedStreamEvent имеет тип DataSream<Tuple3<Integer,Integer,Integer>>, он в основном объединяет 2 потока respirationRateStream и heartRateStream

 public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
        Patient_id = patient_id;
        HeartRate = heartRate;
        RespirationRate = respirationRate;

Существует еще одна программа Flink, которая работает на хост-ОС и пытается прочитать поток данных из kafka. Я использую здесь localhost, так как kafka и zookeper работают на хост-ОС.

public class ReadFromKafka {


    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");



       DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

       /* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new , properties));*/

        message.print();


        env.execute();


    } //main
} //ReadFromKafka

Я получаю вывод что-то вроде этого

введите здесь описание изображения

Я думаю, мне нужно реализовать десериализатор типа JoinedStreamEvent. Может кто-нибудь подсказать, как мне написать десериализатор для JoinedStreamEvent типа DataSream<Tuple3<Integer, Integer, Integer>>

Пожалуйста, дайте мне знать, если нужно сделать что-то еще.

P.S. - Думал написать вслед за десериализатором, но не думаю, что это правильно

DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));

person Amarjit Dhillon    schedule 22.08.2017    source источник
comment
TypeInformationSerializationSchema также должен выполнять эту работу, но вам нужно передать TypeInformation конструктору: TypeInformation.of(JoinedStreamEvent.class)   -  person twalthr    schedule 23.08.2017


Ответы (1)


Я смог получать события VM в том же формате, написав собственный сериализатор и десериализатор как для VM, так и для программы хост-ОС, как указано ниже.

public class JoinSchema implements DeserializationSchema<JoinedStreamEvent> , SerializationSchema<JoinedStreamEvent> {


    @Override
    public JoinedStreamEvent deserialize(byte[] bytes) throws IOException {
        return JoinedStreamEvent.fromstring(new String(bytes));
    }

    @Override
    public boolean isEndOfStream(JoinedStreamEvent joinedStreamEvent) {
        return false;
    }

    @Override
    public TypeInformation<JoinedStreamEvent> getProducedType() {
        return TypeExtractor.getForClass(JoinedStreamEvent.class);
    }

    @Override
    public byte[] serialize(JoinedStreamEvent joinedStreamEvent) {
        return joinedStreamEvent.toString().getBytes();
    }
} //JoinSchema

Обратите внимание, что вам, возможно, придется написать метод fromstring() в вашем методе типа события, как я добавил ниже fromString() Класс JoinedStreamEvent.

public static JoinedStreamEvent fromstring(String line){

        String[] token = line.split(",");

        JoinedStreamEvent joinedStreamEvent = new JoinedStreamEvent();


        Integer val1 = Integer.valueOf(token[0]);
        Integer val2 = Integer.valueOf(token[1]);
        Integer val3 = Integer.valueOf(token[2]);

        return  new JoinedStreamEvent(val1,val2,val3);


    } //fromstring

События были отправлены с виртуальной машины с использованием кода ниже

stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new JoinSchema(), properties));

События были получены с использованием следующего кода

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("zookeeper.connect", "localhost:2181");
    properties.setProperty("group.id", "test");


    DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
            new JoinSchema(), properties));

    message.print();



    env.execute();


} //main
person Amarjit Dhillon    schedule 23.08.2017