Обработка upserts на большом количестве разделов выполняется недостаточно быстро

Проблема

У нас есть установка Delta Lake поверх ADLS Gen2 со следующими таблицами:

  • bronze.DeviceData: разделены по дате прибытия (Partition_Date)
  • silver.DeviceData: разделены по дате и часу события (Partition_Date и Partition_Hour)

Мы загружаем большие объемы данных (›600 миллионов записей в день) из концентратора событий в bronze.DeviceData (только для добавления). Затем мы обрабатываем новые файлы в потоковом режиме и вставляем их в silver.DeviceData с помощью команды delta MERGE (см. Ниже).

Данные, поступающие в бронзовую таблицу, могут содержать данные из любого раздела в серебре (например, устройство может отправлять исторические данные, которые оно кэширует локально). Однако ›90% данных, поступающих в любой день, поступают с разделов Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS). Поэтому для обновления данных у нас есть два следующих искровых задания:

  • Быстро: обрабатывает данные из трех разделов дат, указанных выше. Здесь важна задержка, поэтому мы отдаем приоритет этим данным.
  • Медленно: обрабатывает все остальное (что угодно, кроме этих трех разделов даты). Задержка не имеет большого значения, но она должна быть в разумные сроки (я бы сказал, не больше недели).

Теперь мы подходим к проблеме: хотя объем данных в медленной работе намного меньше, она выполняется в течение нескольких дней, чтобы обработать медленные бронзовые данные за один день в большом кластере. Причина проста: он должен читать и обновлять множество серебряных разделов (›1000 разделов с датой временами), а поскольку обновления небольшие, но разделы с датой могут быть гигабайтами, эти команды слияния неэффективны.

Более того, со временем эта медленная работа будет становиться все медленнее и медленнее, так как серебряные разделы, которых она касается, будут расти.

Вопросы

  1. Является ли наша схема разбиения и настройка быстрого / медленного задания Spark хорошим способом решения этой проблемы?
  2. Что можно сделать, чтобы улучшить эту настройку? Мы хотели бы снизить затраты и время задержки медленной работы и найти способ, чтобы она росла с увеличением количества данных, поступающих в любой день в бронзе, а не с размером серебряной таблицы.

Дополнительная информация

  • нам нужна команда MERGE, так как некоторые вышестоящие сервисы могут повторно обрабатывать исторические данные, которые затем также должны обновлять серебряную таблицу
  • схема серебряного стола:
CREATE TABLE silver.DeviceData (
  DeviceID LONG NOT NULL, -- the ID of the device that sent the data
  DataType STRING NOT NULL, -- the type of data it sent
  Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
  Value DOUBLE NOT NULL, -- the value that the device sent
  UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
  Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
  Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
  • наша команда MERGE:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)

val batch = ... // the streaming update batch

// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."

val mergeCondition = s"""
  silver.Partition_Date IN ($dates)
  AND silver.Partition_Hour IN ($hours)
  AND silver.Partition_Date = batch.Partition_Date
  AND silver.Partition_Hour = batch.Partition_Hour
  AND silver.DeviceID = batch.DeviceID
  AND silver.Timestamp = batch.Timestamp
  AND silver.DataType = batch.DataType
"""

silverTable.alias("silver")
  .merge(batch.alias("batch"), mergeCondition)
  // only merge if the event is newer
  .whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
  .whenNotMatched.insertAll
  .execute

person hbrgnr    schedule 16.03.2021    source источник
comment
вы используете Databricks, или это ваше собственное озеро в дельте?   -  person Alex Ott    schedule 28.03.2021
comment
@AlexOtt Его блоки данных   -  person Steffen Mangold    schedule 29.03.2021


Ответы (1)


В Databricks есть несколько способов оптимизировать производительность операции merge into:

  • Выполните оптимизацию с помощью ZOrder для столбцов, которые являются частью условия соединения. Это может зависеть от конкретной версии DBR, поскольку более старые версии (до 7.6 IIRC) использовали реальный алгоритм ZOrder, который хорошо работает для меньшего количества столбцов, в то время как DBR 7.6+ использует по умолчанию кривые заполнения гильбертова пространства.
  • Используйте файлы меньшего размера - по умолчанию OPTIMIZE создает файлы размером 1 ГБ, которые необходимо перезаписать. Вы можете использовать spark.databricks.delta.optimize.maxFileSize, чтобы установить размер файла в диапазоне от 32 МБ до 64 МБ, чтобы он перезаписывал меньше данных
  • Используйте условия для разделов таблицы (вы уже это делаете)
  • Не используйте автоматическое сжатие, потому что оно не может выполнять ZOrder, а вместо этого выполните явную оптимизацию с помощью ZOrder. Подробнее см. документацию.
  • Настройте индексирование столбцов, чтобы он индексировал только столбцы, необходимые для вашего условия и запросов. Это частично связано со слиянием, но может немного улучшить скорость записи, поскольку статистика не будет собираться для столбцов, которые не используются для запросов.

В этой презентации Spark Summit рассказывается об оптимизации merge into - какие показатели нужно отслеживать , так далее.

Я не уверен на 100%, что вам нужно условие silver.Partition_Date IN ($dates) AND silver.Partition_Hour IN ($hours), потому что вы можете прочитать больше данных, чем требуется, если у вас нет определенных разделов во входящих данных, но для этого потребуется изучить план выполнения. В этой статье базы знаний объясняется, как убедиться, что merge into использует раздел обрезка.

person Alex Ott    schedule 30.03.2021
comment
Используйте файлы меньшего размера - это мне очень интересно, прямо сейчас каждый раздел (Date, Hour) содержит один файл размером 300 МБ, что, как я понимаю, означает, что ZORDER или индексы фильтра bloom ничего не делают, поскольку Spark всегда будет читать в любом случае полный файл. Я обязательно попробую. Сказав это, как вы думаете, имеет ли смысл иметь раздел Hour, если размер каждого раздела составляет всего 300 МБ? - person hbrgnr; 31.03.2021
comment
ваше понимание правильное - если у вас есть только один файл на раздел, то ZOrder, фильтр Блума и другие оптимизации не помогут. Что касается часового разделения - это действительно зависит от того, как часто пакетные данные содержат данные за несколько часов дня по сравнению только с определенными часами дня - в последнем случае может помочь разделение на час. - person Alex Ott; 31.03.2021
comment
Пакетные обновления будут попадать в большинство часовых разделов, поэтому я думаю, что это бесполезно. - person hbrgnr; 31.03.2021
comment
в этом случае да, может быть, это и не потребуется, и комбинация ежедневных разделов с ZOrder и т. д. может помочь лучше ... - person Alex Ott; 31.03.2021