Как обрабатывать сложные данные в ArrayType с помощью функций Spark

Есть источник данных json. Вот пример одной строки:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ]
}

Итак, при загрузке для Spark DataFrame схема выглядит так:

root
 |-- PrimaryAcctNumber: string (nullable = true)
 |-- AdditionalData: array (nullable = true)
 |    |-- element: struct (containsNull = true)

Я хочу использовать Spark для создания нового столбца с именем LongestAddressOfPrimaryAccount на основе столбца AdditionalData (ArrayType[StructType]), используя следующую логику:

  • Iterate AdditionalData
    • If AccountNumber property equals PrimaryAcctNumber of the row, the value of LongestAddressOfPrimaryAccount will be the longest string in Addrs array
    • Если ни одно свойство AccountNumber не равно PrimaryAcctNumber, значение будет N / A

Итак, для данной строки выше ожидаемый результат:

{
  "PrimaryAcctNumber": "account1",
  "AdditionalData": [
    {
      "Addrs": [
        "an address for account1",
        "the longest address in the address list for account1",
        "another address for account1"
      ],
      "AccountNumber": "Account1",
      "IP": 2368971684
    },
    {
      "Addrs": [
        "an address for account2",
        "the longest address in the address list for account2",
        "another address for account2"
      ],
      "AccountNumber": "Account2",
      "IP": 9864766814
    }
  ],
  "LongestAddressOfPrimaryAccount": "the longest address in the address list for account1"
}

Можно использовать UDF или функцию карты. Но это не лучшая практика для Spark.

Можно ли просто использовать функции Spark? Что-то вроде:

sourceDdf.withColumn("LongestAddressOfPrimaryAccount", coalesce(
  longest(
    get_field(iterate_array_for_match($"AdditionalData", "AccountNumber", $"PrimaryAcctNumber"), "Addrs")
  )
  , lit("N/A")))

person DeepNightTwo    schedule 08.03.2018    source источник


Ответы (1)


Вам нужно будет написать udf функцию для вашего требования, если у вас есть Spark версии 2.2 или ниже, поскольку встроенные функции будут более сложными и медленными ( медленнее в том смысле, что вам придется комбинировать больше встроенных функций), чем при использовании функции udf. И мне не известно о такой встроенной функции, которая могла бы напрямую удовлетворить ваши требования.

Команда Databricks работает над Вложенные данные с использованием функций высшего порядка в SQL, и они будут в следующих выпусках.

До тех пор вам придется написать udf функцию, если вы не хотите, чтобы ваша работа была сложной.

person Ramesh Maharjan    schedule 08.03.2018
comment
Я точно использую последнюю версию выпуска Spark. Спасибо за руководство. - person DeepNightTwo; 08.03.2018