Как подсчитать по свойствам и временному окну с Apache Flink?

Предположим, у меня есть файл формы (по одному событию в строке):

Source,Timestamp aa,2014-05-02 22:12:11 bb,2014-05-02 22:22:11

И я хотел бы суммировать количество событий, сгруппированных по источникам с непрерывным временным окном в 5 минут. Как мне это сделать с Flink?

Что у меня сейчас есть:

   final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class);

    stream
        .keyBy("getSource()")
        .timeWindow(Time.minutes(5))
        .sum("getTimestamp()");     

    env.execute();

public class Event {
    private final String source;
    private final long timestamp;

    public Event(String source, long timestamp) {
        this.source = source;
        this.timestamp = timestamp;
    }

    public String getSource() {
        return source;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

Мне не хватает двух вещей. Во-первых, это не удается и говорит, что класс Event не является POJO. Во-вторых, я не умею считать количество событий в окне. Сейчас использую .sum("getTimestamp()"), но уверен, что это не так. Есть предположения?


person Johnny    schedule 21.04.2016    source источник


Ответы (1)


Я бы рекомендовал использовать функцию fold для агрегирования окон. Следующий фрагмент кода должен помочь:

public class Job {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<Event> stream = env.fromElements(new Event("a", 1), new Event("b", 2), new Event("a", 2)).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Event>() {
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(Event event, long l) {
                return new Watermark(l);
            }

            @Override
            public long extractTimestamp(Event event, long l) {
                return event.getTimestamp();
            }
        });

        DataStream<Tuple2<String, Integer>> count = stream.keyBy(new KeySelector<Event, String>() {
                @Override
                public String getKey(Event event) throws Exception {
                    return event.getSource();
                }
            })
            .timeWindow(Time.minutes(5))
            .fold(Tuple2.of("", 0), new FoldFunction<Event, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> fold(Tuple2<String, Integer> acc, Event o) throws Exception {
                    return Tuple2.of(o.getSource(), acc.f1 + 1);
                }
            });

        count.print();

        env.execute();
    }

    public static class Event {
        private final String source;
        private final long timestamp;

        public Event(String source, long timestamp) {
            this.source = source;
            this.timestamp = timestamp;
        }

        public String getSource() {
            return source;
        }

        public long getTimestamp() {
            return timestamp;
        }
    }
}
person Till Rohrmann    schedule 21.04.2016