В чем разница между следующими способами подсчета слов в Apache Flink?

Apache Flink предоставляет множество операций для DataSet. Немного сложно понять, как данные обрабатываются в кластере. Например, WordCount имеет разные инструменты. В чем разница?

Было бы очень полезно, если бы были какие-то документы, поясняющие, каков поток данных для этих агрегатов в кластере.

    // get input data
    DataSet<String> text = env.fromElements(
            "To be, or not to be,--that is the question:--",
            "Whether 'tis nobler in the mind to suffer",
            "The slings and arrows of outrageous fortune",
            "Or to take arms against a sea of troubles,"
            );
    // WordCount 1
    text.flatMap(new LineSplitter()).groupBy(0).sum(1).print();

    // WordCount 2
    text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1).print();

    // WordCount 3
    text.flatMap(new LineSplitter()).groupBy(0)
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<String, Integer>(t1.f0, t1.f1+t2.f1);
            }
        }).print();

    // WordCount 4
    text.flatMap(new LineSplitter()).groupBy(0)
            .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    int prefixSum = 0;
                    String key = null;
                    for (Tuple2<String, Integer> t : iterable) {
                        prefixSum += t.f1;
                        key = t.f0;
                    }
                    collector.collect(new Tuple2<String, Integer>(key, prefixSum));
            }
        }).print();

    // WordCount 5
    text.flatMap(new LineSplitter())
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                HashMap<String, Integer> map = new HashMap<String, Integer>();
                for(Tuple2<String, Integer> t : iterable){
                    if(map.containsKey(t.f0)){
                        map.replace(t.f0, map.get(t.f0)+t.f1);
                    } else {
                        map.put(t.f0, t.f1);
                    }
                }
                for(Map.Entry<String, Integer> pair : map.entrySet()){
                    collector.collect(new Tuple2<String, Integer>(pair.getKey(), pair.getValue()));
                }
            }
        }).print();

person Jun    schedule 24.10.2015    source источник


Ответы (1)


За исключением WordCount 5, все программы выполняются очень похоже на обычную программу MapReduce WordCount (перетасовка на основе хэшей и группировка на основе сортировки).

  • WordCount 1 — синтаксический сахар для WordCount 2.
  • WordCount 2 внутренне выполняется с GroupReduceFunction, аналогичным тому, что используется в WordCount 4. Единственное отличие состоит в том, что внутренний GroupReduceFunction реализует интерфейс Combinable для поддержки частичного агрегирования.
  • WordCount 3 использует ReduceFunction, который выполняется аналогично GroupReduceFunction. Однако из-за другого интерфейса ReduceFunction всегда можно комбинировать (без необходимости отдельного метода combine).
  • WordCount 4 выполняется так же, как обычная программа MapReduce: перемешивание с хэш-секционированием и группировка на основе сортировки. Поскольку GroupReduceFunction не реализует интерфейс Combinable, эта программа выполняется без локальной предварительной агрегации и, следовательно, менее эффективна, чем предыдущие три программы.
  • WordCount 5 очень неэффективен и не должен использоваться, потому что GroupReduceFunction не может выполняться параллельно. Поскольку вызова groupBy() нет, все данные отправляются в один и тот же Reducer и обрабатываются как одна большая группа. Во-первых, это будет медленно, потому что выполняется в одном потоке и ограничено пропускной способностью сети одной машины. Во-вторых, эта программа может легко выйти из строя, если количество отдельных ключей станет слишком большим, потому что группировка выполняется с использованием памяти HashMap.
person Fabian Hueske    schedule 25.10.2015