Для исследовательского проекта я попытался отсортировать элементы в RDD. Я сделал это двумя разными способами.
В первом методе я применил функцию mapPartitions() к СДР, чтобы она сортировала содержимое СДР и предоставляла результирующий СДР, содержащий отсортированный список как единственную запись в СДР. Затем я применил функцию сокращения, которая в основном объединяет отсортированные списки.
Я проводил эти эксперименты на кластере EC2, содержащем 30 узлов. Я настроил его с помощью скрипта spark ec2. Файл данных был сохранен в HDFS.
Во втором подходе я использовал метод sortBy в Spark.
Я выполнил эту операцию с данными переписи населения США (100 МБ), найденными здесь
Одна строка выглядит так
9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.
Я отсортировал по 25-му значению в CSV. В этой строке это 1758.14.
Я заметил, что sortBy работает хуже, чем другой метод. Это ожидаемый сценарий? Если да, то почему бы mapPartitions() и reduce() не использовать метод сортировки по умолчанию?
Вот моя реализация
public static void sortBy(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32);
long start = System.currentTimeMillis();
rdd.sortBy(new Function<String, Double>(){
@Override
public Double call(String v1) throws Exception {
// TODO Auto-generated method stub
String [] arr = v1.split(",");
return Double.parseDouble(arr[24]);
}
}, true, 9).collect();
long end = System.currentTimeMillis();
System.out.println("SortBy: " + (end - start));
}
public static void sortList(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
long start = System.currentTimeMillis();
JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){
@Override
public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
while(t.hasNext()){
String s = t.next();
String arr1[] = s.split(",");
Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
lines.add(t1);
}
Collections.sort(lines, new IncomeComparator());
LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
list.add(lines);
return list;
}
});
rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){
@Override
public LinkedList<Tuple2<Double, String>> call(
LinkedList<Tuple2<Double, String>> a,
LinkedList<Tuple2<Double, String>> b) throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
while (a.size() > 0 && b.size() > 0) {
if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
result.add(a.poll());
else
result.add(b.poll());
}
while (a.size() > 0)
result.add(a.poll());
while (b.size() > 0)
result.add(b.poll());
return result;
}
});
long end = System.currentTimeMillis();
System.out.println("MapPartitions: " + (end - start));
}