Проблема
У нас есть установка Delta Lake поверх ADLS Gen2 со следующими таблицами:
bronze.DeviceData
: разделены по дате прибытия (Partition_Date
)silver.DeviceData
: разделены по дате и часу события (Partition_Date
иPartition_Hour
)
Мы загружаем большие объемы данных (›600 миллионов записей в день) из концентратора событий в bronze.DeviceData
(только для добавления). Затем мы обрабатываем новые файлы в потоковом режиме и вставляем их в silver.DeviceData
с помощью команды delta MERGE (см. Ниже).
Данные, поступающие в бронзовую таблицу, могут содержать данные из любого раздела в серебре (например, устройство может отправлять исторические данные, которые оно кэширует локально). Однако ›90% данных, поступающих в любой день, поступают с разделов Partition_Date IN (CURRENT_DATE(), CURRENT_DATE() - INTERVAL 1 DAYS, CURRENT_DATE() + INTERVAL 1 DAYS)
. Поэтому для обновления данных у нас есть два следующих искровых задания:
- Быстро: обрабатывает данные из трех разделов дат, указанных выше. Здесь важна задержка, поэтому мы отдаем приоритет этим данным.
- Медленно: обрабатывает все остальное (что угодно, кроме этих трех разделов даты). Задержка не имеет большого значения, но она должна быть в разумные сроки (я бы сказал, не больше недели).
Теперь мы подходим к проблеме: хотя объем данных в медленной работе намного меньше, она выполняется в течение нескольких дней, чтобы обработать медленные бронзовые данные за один день в большом кластере. Причина проста: он должен читать и обновлять множество серебряных разделов (›1000 разделов с датой временами), а поскольку обновления небольшие, но разделы с датой могут быть гигабайтами, эти команды слияния неэффективны.
Более того, со временем эта медленная работа будет становиться все медленнее и медленнее, так как серебряные разделы, которых она касается, будут расти.
Вопросы
- Является ли наша схема разбиения и настройка быстрого / медленного задания Spark хорошим способом решения этой проблемы?
- Что можно сделать, чтобы улучшить эту настройку? Мы хотели бы снизить затраты и время задержки медленной работы и найти способ, чтобы она росла с увеличением количества данных, поступающих в любой день в бронзе, а не с размером серебряной таблицы.
Дополнительная информация
- нам нужна команда MERGE, так как некоторые вышестоящие сервисы могут повторно обрабатывать исторические данные, которые затем также должны обновлять серебряную таблицу
- схема серебряного стола:
CREATE TABLE silver.DeviceData (
DeviceID LONG NOT NULL, -- the ID of the device that sent the data
DataType STRING NOT NULL, -- the type of data it sent
Timestamp TIMESTAMP NOT NULL, -- the timestamp of the data point
Value DOUBLE NOT NULL, -- the value that the device sent
UpdatedTimestamp TIMESTAMP NOT NULL, -- the timestamp when the value arrived in bronze
Partition_Date DATE NOT NULL, -- = TO_DATE(Timestamp)
Partition_Hour INT NOT NULL -- = HOUR(Timestamp)
)
USING DELTA
PARTITIONED BY (Partition_Date, Partition_Hour)
LOCATION '...'
- наша команда MERGE:
val silverTable = DeltaTable.forPath(spark, silverDeltaLakeDirectory)
val batch = ... // the streaming update batch
// the dates and hours that we want to upsert, for partition pruning
// collected from the streaming update batch
val dates = "..."
val hours = "..."
val mergeCondition = s"""
silver.Partition_Date IN ($dates)
AND silver.Partition_Hour IN ($hours)
AND silver.Partition_Date = batch.Partition_Date
AND silver.Partition_Hour = batch.Partition_Hour
AND silver.DeviceID = batch.DeviceID
AND silver.Timestamp = batch.Timestamp
AND silver.DataType = batch.DataType
"""
silverTable.alias("silver")
.merge(batch.alias("batch"), mergeCondition)
// only merge if the event is newer
.whenMatched("batch.UpdatedTimestamp > silver.UpdatedTimestamp").updateAll
.whenNotMatched.insertAll
.execute