Самый эффективный способ параллельной загрузки множества файлов в Spark?

[Disclaimer: While this question is somewhat specific, I think it circles a very generic issue with Hadoop/Spark.]

Мне нужно обработать большой набор данных (~ 14 ТБ) в Spark. Не агрегирует, в основном фильтрует. Учитывая ~ 30 тыс. файлов (250 файлов частей в месяц в течение 10 лет, каждая часть ~ 200 МБ), я хотел бы загрузить их в RDD/DataFrame и отфильтровать элементы на основе некоторых произвольных фильтров.

Чтобы сделать список файлов эффективным (я нахожусь в google dataproc/cloud storage, поэтому драйвер, выполняющий подстановочные знаки, был очень последовательным и очень медленным), я предварительно вычисляю RDD имен файлов, а затем загружаю их в RDD ( Я использую avro, но тип файла не должен иметь значения), например.

#returns an array of files to load
files = sc.textFile('/list/of/files/').collect()  

#load the files into a dataframe
documents = sqlContext.read.format('com.databricks.spark.avro').load(files)

Когда я это делаю, даже в кластере с 50 рабочими процессами кажется, что только один исполнитель выполняет работу по чтению файлов. Я экспериментировал с трансляцией списка файлов и прочитал дюжину разных подходов, но, похоже, не могу решить проблему.

Итак, есть ли эффективный способ создать очень большой фрейм данных из нескольких файлов? Как лучше всего использовать всю потенциальную вычислительную мощность при создании этого RDD?

Этот подход очень хорошо работает на небольших наборах, но при таком размере я вижу большое количество симптомов, таких как длительные процессы без обратной связи. Есть ли какой-то кладезь знаний — помимо @zero323 :-) — по оптимизации искры в таком масштабе?


person joshua.ewer    schedule 29.06.2016    source источник
comment
Рассматривали ли вы кэширование и контрольные точки? Как насчет настройки фрагментов разделов? Это очень широкий вопрос, на который нужно ответить.   -  person eliasah    schedule 30.06.2016
comment
Да, я понимаю, что это очень широко; это то, чего я боялся, но я не уверен, куда идти дальше. На этом этапе могут помочь даже общие указатели Google, чтобы я мог найти правильные триггерные слова для исследования. Пытаясь не быть одним из тех, пришлите мне код, пожалуйста, вопросы.   -  person joshua.ewer    schedule 30.06.2016
comment
Вам нужно быть более конкретным в отношении производительности и конфигурации кластера и приложений. (я не говорю, что это не интересный вопрос)   -  person eliasah    schedule 30.06.2016
comment
Основная сложность этого процесса заключается в том, что мне нужно создать окно row_number для строк, сгруппированных по определенному ключу. Представьте себе что-то вроде «Для каждого идентификатора получить первую строку, отсортированную по номеру версии в порядке убывания». Самый простой способ сделать это — поместить все в один большой DataFrame, а затем применить окно. Я думаю, что решение для резервного копирования состоит в том, чтобы разбить его на несколько проходов, проверить данные, а затем выполнить последний проход для всех промежуточных выходных данных. Мне просто интересно, если сделать это одним выстрелом, это даже реалистичный подход.   -  person joshua.ewer    schedule 30.06.2016
comment
Ну, это кажется чем-то совершенно другим, чем обычная фильтрация. Почему бы вам не разбить его на верхнюю N строку для каждого файла, объединить и снова начать N? Сложность снизится на логарифмический М-фактор.   -  person eliasah    schedule 30.06.2016
comment
Но опять же, это всего лишь подход, который может быть далек от оптимального, но, учитывая имеющуюся у нас информацию, трудно сказать...   -  person eliasah    schedule 30.06.2016
comment
На самом деле это кажется отличной идеей. Это стоит изучить больше. Однако огромное, подавляющее большинство идентификаторов имеют только одну запись, но это кажется концептуально надежным подходом. Я буду обновлять это и добавлять любую информацию здесь или в ответах для потомков. На самом деле не так много доступной информации относительно такого масштаба.   -  person joshua.ewer    schedule 30.06.2016
comment
Давайте продолжим обсуждение в чате.   -  person joshua.ewer    schedule 30.06.2016
comment
похоже, что только один исполнитель выполняет работу по чтению файлов. - это интересно. Как вы это наблюдаете? Это предполагает некоторую странность местоположения данных, если это правда.   -  person zero323    schedule 30.06.2016
comment
@zero323 закинул в этот чат картинку с моей искровой статистикой   -  person joshua.ewer    schedule 06.07.2016
comment
Вы когда-нибудь находили решение этой проблемы? Я хочу сделать то же самое..   -  person Thomas    schedule 27.07.2018


Ответы (1)


Список 30 000 файлов не должен быть проблемой для GCS — даже если один запрос списка GCS, содержащий до 500 файлов за раз, будет занимать 1 секунду каждый, все 30 000 файлов будут перечислены примерно через минуту. Могут быть некоторые крайние случаи с некоторыми шаблонами глобусов, которые замедляют работу, но недавно были проведены оптимизации в коннекторе GCS реализация подстановки, которая может помочь.

Вот почему вам должно быть достаточно просто полагаться на API Spark по умолчанию с подстановкой:

val df = sqlContext.read.avro("gs://<BUCKET>/path/to/files/")
person Igor Dvorzhak    schedule 17.03.2019