Есть источник данных json. Вот пример одной строки:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
]
}
Итак, при загрузке для Spark DataFrame схема выглядит так:
root
|-- PrimaryAcctNumber: string (nullable = true)
|-- AdditionalData: array (nullable = true)
| |-- element: struct (containsNull = true)
Я хочу использовать Spark для создания нового столбца с именем LongestAddressOfPrimaryAccount
на основе столбца AdditionalData (ArrayType[StructType])
, используя следующую логику:
- Iterate AdditionalData
- If
AccountNumber
property equalsPrimaryAcctNumber
of the row, the value ofLongestAddressOfPrimaryAccount
will be the longest string inAddrs
array - Если ни одно свойство
AccountNumber
не равноPrimaryAcctNumber
, значение будет N / A
- If
Итак, для данной строки выше ожидаемый результат:
{
"PrimaryAcctNumber": "account1",
"AdditionalData": [
{
"Addrs": [
"an address for account1",
"the longest address in the address list for account1",
"another address for account1"
],
"AccountNumber": "Account1",
"IP": 2368971684
},
{
"Addrs": [
"an address for account2",
"the longest address in the address list for account2",
"another address for account2"
],
"AccountNumber": "Account2",
"IP": 9864766814
}
],
"LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}
Можно использовать UDF или функцию карты. Но это не лучшая практика для Spark.
Можно ли просто использовать функции Spark? Что-то вроде:
sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
longest(
get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
)
, lit("N/A")))