сортировка элементов RDD

Для исследовательского проекта я попытался отсортировать элементы в RDD. Я сделал это двумя разными способами.

В первом методе я применил функцию mapPartitions() к СДР, чтобы она сортировала содержимое СДР и предоставляла результирующий СДР, содержащий отсортированный список как единственную запись в СДР. Затем я применил функцию сокращения, которая в основном объединяет отсортированные списки.

Я проводил эти эксперименты на кластере EC2, содержащем 30 узлов. Я настроил его с помощью скрипта spark ec2. Файл данных был сохранен в HDFS.

Во втором подходе я использовал метод sortBy в Spark.

Я выполнил эту операцию с данными переписи населения США (100 МБ), найденными здесь

Одна строка выглядит так

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.

Я отсортировал по 25-му значению в CSV. В этой строке это 1758.14.

Я заметил, что sortBy работает хуже, чем другой метод. Это ожидаемый сценарий? Если да, то почему бы mapPartitions() и reduce() не использовать метод сортировки по умолчанию?

Вот моя реализация

public static void sortBy(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32);
        long start = System.currentTimeMillis();
        rdd.sortBy(new Function<String, Double>(){

            @Override
                public Double call(String v1) throws Exception {
                      // TODO Auto-generated method stub
                  String [] arr = v1.split(",");
                  return Double.parseDouble(arr[24]);   
                }
        }, true, 9).collect();
        long end = System.currentTimeMillis();
        System.out.println("SortBy: " + (end - start));
  }

public static void sortList(JavaSparkContext sc){
        JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
        long start = System.currentTimeMillis();
        JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
            throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
          while(t.hasNext()){       
            String s = t.next();
            String arr1[] = s.split(",");
            Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
            lines.add(t1);
          }
          Collections.sort(lines, new IncomeComparator());
          LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
          list.add(lines);
          return list;
        }

        });
        rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){

        @Override
        public LinkedList<Tuple2<Double, String>> call(
                LinkedList<Tuple2<Double, String>> a,
                LinkedList<Tuple2<Double, String>> b) throws Exception {
          // TODO Auto-generated method stub
          LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
          while (a.size() > 0 && b.size() > 0) {

            if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
              result.add(a.poll());
            else
              result.add(b.poll());
          }

          while (a.size() > 0)
            result.add(a.poll());

          while (b.size() > 0)
            result.add(b.poll());

          return result;

        }

        });     
        long end = System.currentTimeMillis();
        System.out.println("MapPartitions: " + (end - start));
  }

person RagHaven    schedule 09.06.2015    source источник
comment
Это может быть лучший вопрос для списка рассылки.   -  person Justin Pihony    schedule 09.06.2015


Ответы (1)


Collect() является основным узким местом, так как возвращает все результаты драйверу.
Он создает как обращения ввода-вывода, так и дополнительный сетевой трафик к одному источнику (в данном случае — к драйверу).
Он также блокирует другие операции.

Вместо collect() в первом сегменте кода sortBy() попробуйте выполнить параллельную операцию, например saveAsTextFile(tmp), а затем выполнить обратное чтение с помощью sc.textFile(tmp).

В другом сегменте кода sortBy() используются параллельные API mapPartitions() и reduce(), поэтому вся работа выполняется параллельно.
Казалось бы, это и есть причина различий в сквозное время производительности.

Обратите внимание, что ваши выводы не обязательно означают, что сумма времени выполнения на всех машинах хуже.

person Leet-Falcon    schedule 11.08.2015