Порядок SparkSQL DataFrame по разделам

Я использую Spark sql для выполнения запроса по моему набору данных. Результат запроса довольно мал, но все же разделен.

Я хотел бы объединить полученный DataFrame и упорядочить строки по столбцу. Я пытался

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

Я тоже пробовал

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

выходной файл упорядочен по частям (т.е. разделы упорядочены, но фрейм данных не упорядочен в целом). Например, вместо

1, value
2, value
4, value
4, value
5, value
5, value
...

я получил

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. Как правильно получить абсолютный порядок результатов моего запроса?
  2. Почему фрейм данных не объединяется в один раздел?

person fo_x86    schedule 31.07.2015    source источник
comment
Как вы знаете, повторное разбиение на разделы - это ленивый процесс, и он не будет выполнен до следующей операции. Я предлагаю вам вставить подсчет количества между упорядочиванием и повторным разбиением, чтобы вы могли убедиться, что повторное разбиение произошло до упорядочения, а не вместе. Сообщите мне результат.   -  person Abdulrahman    schedule 31.07.2015
comment
Я безуспешно пытался добавить count как result = result.coalesce(1); result.count(); result.orderBy("col1") ...   -  person fo_x86    schedule 04.08.2015
comment
Посмотрите этот пост: stackoverflow.com/questions/24371259/   -  person IrishDog    schedule 29.12.2015
comment
@ fo_x86: вы должны использовать объединение или повторное разбиение после преобразования DF в JSON, а затем сохранить как текстовый файл. Это должно решить вашу проблему.   -  person Shankar    schedule 01.06.2017


Ответы (2)


Я хочу упомянуть здесь пару вещей. 1 - исходный код показывает, что оператор orderBy внутренне вызывает api сортировки с глобальным порядком, установленным на true. Таким образом, отсутствие порядка на уровне вывода предполагает, что порядок был потерян при записи в цель. Я хочу сказать, что вызов orderBy всегда требует глобального порядка.

2- Использование радикального объединения, например, форсирование одного раздела в вашем случае, может быть действительно опасным. Я бы рекомендовал вам не делать этого. Исходный код предполагает, что вызов coalesce (1) потенциально может привести к тому, что восходящие преобразования будут использовать один раздел. Это было бы брутальным выступлением.

3- Вы, кажется, ожидаете, что оператор orderBy будет выполняться с одним разделом. Я не думаю, что согласен с этим утверждением. Это сделало бы Spark действительно глупым распределенным фреймворком.

Сообщество, пожалуйста, дайте мне знать, если вы согласны или не согласны с утверждениями.

как вы вообще собираете данные из вывода?

возможно, вывод действительно содержит отсортированные данные, но преобразования / действия, которые вы выполнили для чтения из вывода, ответственны за потерянный порядок.

person JavaPlanet    schedule 20.01.2016

OrderBy создаст новые разделы после вашего объединения. Чтобы получить единственный выходной раздел, измените порядок операций ...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

Как упоминалось в @JavaPlanet, для действительно больших данных вы не хотите объединяться в один раздел. Это резко снизит уровень параллелизма.

person Doug Bateman    schedule 04.03.2018