Apache Flink предоставляет множество операций для DataSet. Немного сложно понять, как данные обрабатываются в кластере. Например, WordCount имеет разные инструменты. В чем разница?
Было бы очень полезно, если бы были какие-то документы, поясняющие, каков поток данных для этих агрегатов в кластере.
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
// WordCount 1
text.flatMap(new LineSplitter()).groupBy(0).sum(1).print();
// WordCount 2
text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1).print();
// WordCount 3
text.flatMap(new LineSplitter()).groupBy(0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
return new Tuple2<String, Integer>(t1.f0, t1.f1+t2.f1);
}
}).print();
// WordCount 4
text.flatMap(new LineSplitter()).groupBy(0)
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
int prefixSum = 0;
String key = null;
for (Tuple2<String, Integer> t : iterable) {
prefixSum += t.f1;
key = t.f0;
}
collector.collect(new Tuple2<String, Integer>(key, prefixSum));
}
}).print();
// WordCount 5
text.flatMap(new LineSplitter())
.reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
@Override
public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
HashMap<String, Integer> map = new HashMap<String, Integer>();
for(Tuple2<String, Integer> t : iterable){
if(map.containsKey(t.f0)){
map.replace(t.f0, map.get(t.f0)+t.f1);
} else {
map.put(t.f0, t.f1);
}
}
for(Map.Entry<String, Integer> pair : map.entrySet()){
collector.collect(new Tuple2<String, Integer>(pair.getKey(), pair.getValue()));
}
}
}).print();