В Hadoop объединение/объединение больших равносекционированных наборов данных может быть выполнено без перетасовки и уменьшения фазы, просто используя соединение на стороне карты с CompositeInputFormat.
Попытка выяснить, как это сделать в Spark:
val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
.repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
.repartition(col("k")).cache()
val xy = x.join(y, x.col("k") === y.col("k"), "outer")
x.show() y.show() xy.show()
+---+---+ +---+---+ +----+----+----+----+
| k| v| | k| v| | k| v| k| v|
+---+---+ +---+---+ +----+----+----+----+
| A| 6| | C| 12| | A| 4|null|null|
| B| 5| | D| 11| | B| 3|null|null|
| C| 4| | E| 10| | C| 2| C| 8|
| D| 3| | F| 9| | D| 1| D| 7|
| E| 2| | G| 8| |null|null| E| 6|
| F| 1| | H| 7| |null|null| F| 5|
+---+---+ +---+---+ +----+----+----+----+
Все идет нормально. Но когда я проверяю план выполнения, я вижу «ненужные» сорта:
xy.explain
== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
: +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
+- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None
Можно ли здесь избежать сортировок?
Изменить
Для справки, эта «функция» в Hadoop доступна с 2007 года: https://issues.apache.org/jira/browse/HADOOP-2085
Обновить
Как указал Леззар, одного repartition() недостаточно для достижения равнораспределенного отсортированного состояния. Я думаю, что теперь за ним должен следовать sortWithinPartitions() Так что это должно сработать:
val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
.repartition(col("k")).sortWithinPartitions(col("k")).cache()
ху.объясните()
== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None
Больше никакой сортировки!