Слияние равносекционированных фреймов данных в Spark

В Hadoop объединение/объединение больших равносекционированных наборов данных может быть выполнено без перетасовки и уменьшения фазы, просто используя соединение на стороне карты с CompositeInputFormat.

Попытка выяснить, как это сделать в Spark:

val x = sc.parallelize(Seq(("D", 1), ("C", 2), ("B", 3), ("A", 4))).toDF("k", "v")
    .repartition(col("k")).cache()
val y = sc.parallelize(Seq(("F", 5), ("E", 6), ("D", 7), ("C", 8))).toDF("k", "v")
    .repartition(col("k")).cache()

val xy = x.join(y, x.col("k") === y.col("k"), "outer")

x.show()    y.show()    xy.show()

+---+---+   +---+---+   +----+----+----+----+
|  k|  v|   |  k|  v|   |   k|   v|   k|   v|
+---+---+   +---+---+   +----+----+----+----+
|  A|  6|   |  C| 12|   |   A|   4|null|null|
|  B|  5|   |  D| 11|   |   B|   3|null|null|
|  C|  4|   |  E| 10|   |   C|   2|   C|   8|
|  D|  3|   |  F|  9|   |   D|   1|   D|   7|
|  E|  2|   |  G|  8|   |null|null|   E|   6|
|  F|  1|   |  H|  7|   |null|null|   F|   5|
+---+---+   +---+---+   +----+----+----+----+

Все идет нормально. Но когда я проверяю план выполнения, я вижу «ненужные» сорта:

xy.explain

== Physical Plan ==
SortMergeOuterJoin [k#1283], [k#1297], FullOuter, None
:- Sort [k#1283 ASC], false, 0
:  +- InMemoryColumnarTableScan [k#1283,v#1284], InMemoryRelation [k#1283,v#1284], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1283,200), None, None
+- Sort [k#1297 ASC], false, 0
   +- InMemoryColumnarTableScan [k#1297,v#1298], InMemoryRelation [k#1297,v#1298], true, 10000, StorageLevel(true, true, false, true, 1), TungstenExchange hashpartitioning(k#1297,200), None, None

Можно ли здесь избежать сортировок?

Изменить

Для справки, эта «функция» в Hadoop доступна с 2007 года: https://issues.apache.org/jira/browse/HADOOP-2085

Обновить

Как указал Леззар, одного repartition() недостаточно для достижения равнораспределенного отсортированного состояния. Я думаю, что теперь за ним должен следовать sortWithinPartitions() Так что это должно сработать:

val x = sc.parallelize(Seq(("F", 1), ("E", 2), ("D", 3), ("C", 4), ("B", 5), ("A", 6))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()
val y = sc.parallelize(Seq(("H", 7), ("G", 8), ("F", 9), ("E",10), ("D",11), ("C",12))).toDF("k", "v")
    .repartition(col("k")).sortWithinPartitions(col("k")).cache()

ху.объясните()

== Physical Plan ==
SortMergeOuterJoin [k#1055], [k#1069], FullOuter, None
:- InMemoryColumnarTableScan [k#1055,v#1056], InMemoryRelation [k#1055,v#1056], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1055 ASC], false, 0, None
+- InMemoryColumnarTableScan [k#1069,v#1070], InMemoryRelation [k#1069,v#1070], true, 10000, StorageLevel(true, true, false, true, 1), Sort [k#1069 ASC], false, 0, None

Больше никакой сортировки!


person yurgis    schedule 10.03.2016    source источник


Ответы (2)


Почему вы говорите ненужную сортировку? Соединение слиянием требует сортировки данных. И, ИМХО, нет лучшей стратегии, чем объединение слиянием для выполнения полного внешнего соединения, за исключением случаев, когда один из ваших кадров данных достаточно мал для трансляции.

person Lezzar Walid    schedule 16.03.2016
comment
Когда вы объединяете два отсортированных набора, сортировать не нужно. Это в значительной степени O (n). Кроме того, для равнораспределенных наборов слияние может выполняться локально для каждого раздела. - person yurgis; 16.03.2016
comment
Да, но когда вы сортировали свои данные? вы только перераспределили его, используя свой ключ соединения, но никогда не выполняли никаких действий по сортировке. Таким образом, искра не может знать, что ваши данные отсортированы. - person Lezzar Walid; 17.03.2016
comment
насколько я понимаю, перераспределение - это сортировка данных в разделе, попробуйте команду x.show() - person yurgis; 17.03.2016
comment
перераспределение не обязательно сортирует данные. Он только распределяет его по узлам в соответствии с ключом. Даже если по какой-либо случайности данные будут найдены отсортированными после переразбиения, механизм spark никогда не узнает об этом, потому что нет никакого шага сортировки, который гарантирует spark, что данные отсортированы. - person Lezzar Walid; 17.03.2016
comment
Вы правы в том смысле, что перераспределение само по себе не сортирует разделы. Мое предположение было неверным. Согласно документу, эта функция работает так же, как DISTRIBUTE BY в улье. И чтобы гарантировать сортировку, за DISTRIBUTE BY должна следовать SORT BY, чтобы достичь как равномерного, так и отсортированного состояния для каждого раздела. - person yurgis; 17.03.2016

Подобно объединению на стороне карты в Hadoop, Spark имеет широковещательное соединение, которое передает данные таблицы всем рабочим процессам точно так же, как это делает распределенный кеш в Hadoop mapreduce. Пожалуйста, ознакомьтесь с документацией по искре или найдите один раз хеш-соединение искровой трансляции. Spark автоматически позаботится об этом, в отличие от улья. Так что не стоит об этом беспокоиться.

Однако для этого вам нужно будет понять несколько параметров.

-> spark.sql.autoBroadcastJoinThreshold, размер, ниже которого spark автоматически транслирует таблицу.

Вы можете попробовать приведенный ниже код, чтобы понять присоединение к широковещанию, а также вы можете обратиться к искровой документации для присоединения к широковещательному вещанию или найти более подробную информацию в Google.

Пример кода, чтобы попробовать:

val sqlContext = new HiveContext(sc);
1) sqlContext.sql("CREATE TABLE IF NOT EXISTS tab3 (key INT, value STRING)")

2) sqlContext.sql("INSERT INTO tab4 select 1,\"srini\" from sr23");
(I have created other table to just insert a record into table. As hive only support insert into select, i have used this trick to have some data.) You can skip this step as well, as you just want to see the physical plan.

------ You can also use any Hive table that is already created instead.. I am just trying to simulate the hive table thats it. --- 

3) val srini_df1 = sqlContext.sql("ANALYZE TABLE tab4 COMPUTE STATISTICS NOSCAN");

4) val df2 = sc.parallelize(Seq((5,"F"), (6,"E"), (7,"sri"), (1,"test"))).toDF("key", "value")

5) val join_df = sqlContext.sql("SELECT * FROM tab5").join(df2,"key");

6) join_df.explain
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(530360) called with curMem=238151, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23 stored as values in memory (estimated size 517.9 KB, free 529.3 MB)
16/03/15 22:40:09 INFO storage.MemoryStore: ensureFreeSpace(42660) called with curMem=768511, maxMem=555755765
16/03/15 22:40:09 INFO storage.MemoryStore: Block broadcast_23_piece0 stored as bytes in memory (estimated size 41.7 KB, free 529.2 MB)
16/03/15 22:40:09 INFO storage.BlockManagerInfo: Added broadcast_23_piece0 in memory on localhost:63721 (size: 41.7 KB, free: 529.9 MB)
16/03/15 22:40:09 INFO spark.SparkContext: Created broadcast 23 from explain at <console>:28
== Physical Plan ==
Project [key#374,value#375,value#370]
 BroadcastHashJoin [key#374], [key#369], BuildLeft
  HiveTableScan [key#374,value#375], (MetastoreRelation default, tab5, None)
  Project [_1#367 AS key#369,_2#368 AS value#370]
   Scan PhysicalRDD[_1#367,_2#368]
person Srini    schedule 16.03.2016
comment
Спасибо, что посмотрели, но мой вопрос касается двух БОЛЬШИХ наборов. Широковещательное соединение просто не имеет смысла, если только одна из присоединяемых частей не достаточно мала. - person yurgis; 16.03.2016