Некоторые советы и рекомендации по работе с большими наборами данных в scala spark

Искра классная! Он масштабируемый и быстрый, особенно когда вы пишете в «родном Spark» и избегаете пользовательских Udf. Но при работе с большими фреймами данных есть несколько советов, которые можно использовать, чтобы избежать ошибок OOM и ускорить все вычисления.

Вот краткий список вещей, которые я узнал из своего личного опыта.

Использование конфигурации, подходящей для задачи

Всегда полезно начинать с правильной настройки.

На мой взгляд, у Spark потрясающая документация, очень рекомендую начать с нее.
В зависимости от того, как вы используете Spark: внутри кластера или в автономном режиме, ваша конфигурация будет отличаться. Я использую Spark в основном в автономном режиме, поэтому вот мои примеры:

1. Драйвер memory и драйвер maxResult:

При работе с большим набором данных необходимо увеличить объем памяти по умолчанию и значение maxResultSize.

val spark = SparkSession.builder
.config("spark.driver.maxResultSize", "{YOUR-VALUE}")
.config("spark.driver.memory", "{YOUR-VALUE}")

2. Тайм-аут трансляции, тайм-аут сети и сердцебиение

Когда вы пытаетесь сохранить большой фрейм данных в базу данных или какое-то ведро, я заметил, что иногда задачи могут завершаться с ошибкой только потому, что пороговые значения времени ожидания по умолчанию слишком малы.

.config("spark.sql.broadcastTimeout", "{YOUR-VALUE}")
.config("spark.sql.debug.maxToStringFields", "{YOUR-VALUE}")
.config("spark.network.timeout", "{YOUR-VALUE}")
.config("spark.executor.heartbeatInterval", "{YOUR-VALUE}")

3. Сбор мусора

Вы можете использовать сборщик мусора, предоставленный JVM

.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'")

Использование кэширования в нужном месте:

Кэширование — важная вещь в spark, и, используя его в правильных местах, вы можете серьезно сократить время выполнения. Иногда вы можете подумать о .persist вместо кэширования. Вы можете прочитать больше об этом здесь".

// cache you dataframe after expensive operations
dataFrame
  .select(...)
  .filter(...)
  .groupBy(...)
  .agg(...)
  .cache

Передел перед присоединением:

Соединения — довольно дорогая операция, есть несколько трюков, которые вы можете использовать, но один из них, который я нашел наиболее полезным, — это повторное разбиение перед объединением. Это помогает, потому что к тому времени, когда вы присоединитесь, вы уже можете выполнять некоторые операции с вашими наборами данных, и разделы могут быть искажены. И перекошенные разделы серьезно повлияют на время выполнения объединения.

val leftRepartitioned = left.repartition(256, col("YOUR COLUMN"))
val rightRepartitioned = right.repartition(256, col("YOUR COLUMN"))
val joined = left.join(right, ...)

Передел после groupBy:

То же самое касается groupBy, обычно это очень помогает.

val groupedDataset = foo
  .groupBy("bar")
  .// your aggregations and other operations
  .
  .repartition(256, col("YOUR COLUMN"))

Искажение данных

Чтобы получить от Spark наилучшую производительность, вам нужно обратить внимание на перекос разделов. Об этом написано много замечательных статей (1, 2), поэтому я бы не стал их здесь повторять. Но просто имейте в виду, что иногда переразбиение мешает моей новой работе, если столбец, который вы выбрали для разбиения, перекошен. Если вы не уверены, какую колонку выбрать, вы всегда можете воспользоваться трюком с солью.

пользовательские функции

Отдельно стоит упомянуть слона в комнате — User Defined Functions. Поскольку они дают вам так много свободы, иногда возникает соблазн использовать их чаще, чем они вам действительно нужны.

Каждый раз, когда вы хотите реализовать что-то нестандартное, я рекомендую вам перепроверить коллекции с искровыми функциями по умолчанию из org.apache.spark.sql.functions. Возможно, вы сможете решить свою проблему, используя expr:).

Также рекомендую посмотреть spark-daria. Это коллекция с некоторыми полезными методами, расширяющими возможности Spark.

Удаление кэшированных наборов данных после того, как вы закончите с ними

Если вы кэшировали некоторые фреймы данных с помощью .cache, вы можете вызвать .unpersist, чтобы удалить их из памяти.

val dataFrameCached = dataframe.cache
// some more code
dataFrameCached.unpersist

Или вы можете полностью очистить память, используя

sqlContext.clearCache()

Спасибо за прочтение! Любая обратная связь приветствуется!

LinkedInGitHubСредний