Предположим, у меня есть файл формы (по одному событию в строке):
Source,Timestamp
aa,2014-05-02 22:12:11
bb,2014-05-02 22:22:11
И я хотел бы суммировать количество событий, сгруппированных по источникам с непрерывным временным окном в 5 минут. Как мне это сделать с Flink?
Что у меня сейчас есть:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Event> stream = env.fromCollection(new EventFileReader(new File("path/to/file")), Event.class);
stream
.keyBy("getSource()")
.timeWindow(Time.minutes(5))
.sum("getTimestamp()");
env.execute();
public class Event {
private final String source;
private final long timestamp;
public Event(String source, long timestamp) {
this.source = source;
this.timestamp = timestamp;
}
public String getSource() {
return source;
}
public long getTimestamp() {
return timestamp;
}
}
Мне не хватает двух вещей. Во-первых, это не удается и говорит, что класс Event
не является POJO. Во-вторых, я не умею считать количество событий в окне. Сейчас использую .sum("getTimestamp()")
, но уверен, что это не так. Есть предположения?