Перевести карту потока csv-файла в кортеж

Я пытаюсь сопоставить CSV-файл, уже использованный Flink и созданный Kafka, в Tuple4. В моем CSV-файле 4 столбца, и я хочу сопоставить каждую строку с Tuple4. Проблема в том, что я не знаю, как реализовать функции map () и csv2Tuple.

Вот где я застрял:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

Я хотел бы также проанализировать элементы в кортеже из String в Integer.


person Salvador Vigo    schedule 25.08.2018    source источник
comment
Извините за ошибки, теперь редактируется   -  person Salvador Vigo    schedule 25.08.2018


Ответы (1)


Предположим, вы создаете каждую строку файла csv как сообщение Kafka и потребляете его с помощью коннектора Flink Kafka. Вам просто нужно разделить каждое полученное сообщение с помощью , (потому что это файл csv).

DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                String[] temp = str.split(",");
                return new Tuple4<>(
                        Integer.parseInt(temp[0]),
                        Integer.parseInt(temp[1]),
                        Integer.parseInt(temp[2]),
                        Integer.parseInt(temp[3])
                );

            }
        });
person Soheil Pourbafrani    schedule 25.08.2018