Oracle MERGE переписан на PySpark. Если null - обновить, иначе - вставить

Это мои таблицы:
destination
введите здесь описание изображения
new_data
введите здесь описание изображения

В Oracle SQL я могу сделать это:

MERGE INTO destination d
    USING new_data n
    ON (d.c1 = n.c1 AND d.c2 = n.c2)
  WHEN MATCHED THEN
    UPDATE SET d.d1 = n.d1
         WHERE d.d1 IS NULL
  WHEN NOT MATCHED THEN
    INSERT (c1, c2, d1)
    VALUES (n.c1, n.c2, n.d1);

Затем таблица destination становится такой:
введите здесь описание изображения

Если c1, c2 существуют в destination, а d1 равно null, d1 обновляется.
Если c1, c2 не существуют, вставляются строки.

Есть ли способ сделать то же самое в PySpark?

Это генерирует кадры данных:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dCols = ['c1', 'c2', 'd1']
dData = [('a', 'b', 5), 
         ('c', 'd', None)]
destination = spark.createDataFrame(dData, dCols)

nData = [('a', 'b', 1),
         ('c', 'd', 6),
         ('e', 'f', 7),
         ('g', 'h', None)]
new_data = spark.createDataFrame(nData, dCols)

В PySpark есть почти все, что есть в SQL. Но я не нахожу эквивалента для MERGE.


person ZygD    schedule 11.02.2021    source источник


Ответы (2)


В SQL MERGE можно заменить на левое объединение правое соединение ‹=› полное внешнее соединение:

merged = destination.alias("dest").join(new_data.alias("src"), ["c1", "c2"], "full") \
    .selectExpr("c1", "c2", "coalesce(dest.d1, src.d1) as d1")

merged.show()

#+---+---+----+
#| c1| c2|  d1|
#+---+---+----+
#|  e|  f|   7|
#|  g|  h|null|
#|  c|  d|   6|
#|  a|  b|   5|
#+---+---+----+

Однако каждый раз, когда вы выполняете это слияние, вам нужно будет перезаписывать все свои данные в место назначения, поскольку Spark не поддерживает обновления, и это может привести к снижению производительности. Так что, если вам действительно нужно это сделать, я советую вам взглянуть на Delta Lake, который запускает ACID-транзакции. и который поддерживает MERGE синтаксис.

person blackbishop    schedule 11.02.2021

Вы можете выполнить левое соединение и объединить столбцы, используя coalesce

import pyspark.sql.functions as F

result = new_data.alias('t1').join(
    destination.alias('t2'),
    ['c1', 'c2'],
    'full'
).select('c1', 'c2', F.coalesce('t2.d1', 't1.d1').alias('d1'))

result.show()
+---+---+----+
| c1| c2|  d1|
+---+---+----+
|  e|  f|   7|
|  g|  h|null|
|  c|  d|   6|
|  a|  b|   5|
+---+---+----+
person mck    schedule 11.02.2021