Некоторые советы и рекомендации по работе с большими наборами данных в scala spark
Искра классная! Он масштабируемый и быстрый, особенно когда вы пишете в «родном Spark» и избегаете пользовательских Udf. Но при работе с большими фреймами данных есть несколько советов, которые можно использовать, чтобы избежать ошибок OOM и ускорить все вычисления.
Вот краткий список вещей, которые я узнал из своего личного опыта.
Использование конфигурации, подходящей для задачи
Всегда полезно начинать с правильной настройки.
На мой взгляд, у Spark потрясающая документация, очень рекомендую начать с нее.
В зависимости от того, как вы используете Spark: внутри кластера или в автономном режиме, ваша конфигурация будет отличаться. Я использую Spark в основном в автономном режиме, поэтому вот мои примеры:
1. Драйвер memory и драйвер maxResult:
При работе с большим набором данных необходимо увеличить объем памяти по умолчанию и значение maxResultSize.
val spark = SparkSession.builder .config("spark.driver.maxResultSize", "{YOUR-VALUE}") .config("spark.driver.memory", "{YOUR-VALUE}")
2. Тайм-аут трансляции, тайм-аут сети и сердцебиение
Когда вы пытаетесь сохранить большой фрейм данных в базу данных или какое-то ведро, я заметил, что иногда задачи могут завершаться с ошибкой только потому, что пороговые значения времени ожидания по умолчанию слишком малы.
.config("spark.sql.broadcastTimeout", "{YOUR-VALUE}") .config("spark.sql.debug.maxToStringFields", "{YOUR-VALUE}") .config("spark.network.timeout", "{YOUR-VALUE}") .config("spark.executor.heartbeatInterval", "{YOUR-VALUE}")
3. Сбор мусора
Вы можете использовать сборщик мусора, предоставленный JVM
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'")
Использование кэширования в нужном месте:
Кэширование — важная вещь в spark, и, используя его в правильных местах, вы можете серьезно сократить время выполнения. Иногда вы можете подумать о .persist вместо кэширования. Вы можете прочитать больше об этом здесь".
// cache you dataframe after expensive operations dataFrame .select(...) .filter(...) .groupBy(...) .agg(...) .cache
Передел перед присоединением:
Соединения — довольно дорогая операция, есть несколько трюков, которые вы можете использовать, но один из них, который я нашел наиболее полезным, — это повторное разбиение перед объединением. Это помогает, потому что к тому времени, когда вы присоединитесь, вы уже можете выполнять некоторые операции с вашими наборами данных, и разделы могут быть искажены. И перекошенные разделы серьезно повлияют на время выполнения объединения.
val leftRepartitioned = left.repartition(256, col("YOUR COLUMN")) val rightRepartitioned = right.repartition(256, col("YOUR COLUMN")) val joined = left.join(right, ...)
Передел после groupBy:
То же самое касается groupBy, обычно это очень помогает.
val groupedDataset = foo .groupBy("bar") .// your aggregations and other operations . .repartition(256, col("YOUR COLUMN"))
Искажение данных
Чтобы получить от Spark наилучшую производительность, вам нужно обратить внимание на перекос разделов. Об этом написано много замечательных статей (1, 2), поэтому я бы не стал их здесь повторять. Но просто имейте в виду, что иногда переразбиение мешает моей новой работе, если столбец, который вы выбрали для разбиения, перекошен. Если вы не уверены, какую колонку выбрать, вы всегда можете воспользоваться трюком с солью.
пользовательские функции
Отдельно стоит упомянуть слона в комнате — User Defined Functions. Поскольку они дают вам так много свободы, иногда возникает соблазн использовать их чаще, чем они вам действительно нужны.
Каждый раз, когда вы хотите реализовать что-то нестандартное, я рекомендую вам перепроверить коллекции с искровыми функциями по умолчанию из org.apache.spark.sql.functions. Возможно, вы сможете решить свою проблему, используя expr:).
Также рекомендую посмотреть spark-daria. Это коллекция с некоторыми полезными методами, расширяющими возможности Spark.
Удаление кэшированных наборов данных после того, как вы закончите с ними
Если вы кэшировали некоторые фреймы данных с помощью .cache
, вы можете вызвать .unpersist
, чтобы удалить их из памяти.
val dataFrameCached = dataframe.cache
// some more code dataFrameCached.unpersist
Или вы можете полностью очистить память, используя
sqlContext.clearCache()
Спасибо за прочтение! Любая обратная связь приветствуется!