Свести DataFrame в Scala с разными типами данных внутри

Как вы, возможно, знаете, DataFrame может содержать поля сложного типа, такие как структуры (StructType) или массивы (ArrayType). Вам может понадобиться, как в моем случае, сопоставить все данные DataFrame с таблицей Hive с полями простого типа (String, Integer...). Я долго боролся с этой проблемой и, наконец, нашел решение, которым хочу поделиться. Кроме того, я уверен, что его можно улучшить, поэтому не стесняйтесь отвечать со своими предложениями.

Он основан на этом потоке, но также работает для элементов ArrayType. , а не только StructType. Это хвостовая рекурсивная функция, которая получает DataFrame и возвращает его сплющенным.

def flattenDf(df: DataFrame): DataFrame = {
  var end = false
  var i = 0
  val fields = df.schema.fields
  val fieldNames = fields.map(f => f.name)
  val fieldsNumber = fields.length

  while (!end) {
    val field = fields(i)
    val fieldName = field.name

    field.dataType match {
      case st: StructType =>
        val childFieldNames = st.fieldNames.map(n => fieldName + "." + n)
        val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
        val newDf = df.selectExpr(newFieldNames: _*)
        return flattenDf(newDf)
      case at: ArrayType =>
        val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
        val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode($fieldName) as a")
        val fieldNamesToSelect = fieldNamesExcludingArray ++ Array("a.*")
        val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
        val explodedAndSelectedDf = explodedDf.selectExpr(fieldNamesToSelect: _*)
        return flattenDf(explodedAndSelectedDf)
      case _ => Unit
    }

    i += 1
    end = i >= fieldsNumber
  }
  df
}

person Torcuete    schedule 26.07.2017    source источник
comment
Для начала val fieldNames = df.schema.fieldNames :D   -  person philantrovert    schedule 26.07.2017
comment
@RameshMaharjan попробуйте .. работает абсолютно нормально.   -  person Venkat    schedule 20.07.2018


Ответы (1)


val df = Seq(("1", (2, (3, 4)),Seq(1,2))).toDF()

df.printSchema

root
 |-- _1: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- _1: integer (nullable = false)
 |    |-- _2: struct (nullable = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)
 |-- _3: array (nullable = true)
 |    |-- element: integer (containsNull = false)


def flattenSchema(schema: StructType, fieldName: String = null) : Array[Column] = {
   schema.fields.flatMap(f => {
     val cols = if (fieldName == null) f.name else (fieldName + "." + f.name)
     f.dataType match {
       case structType: StructType => fattenSchema(structType, cols)
       case arrayType: ArrayType => Array(explode(col(cols)))
       case _ => Array(col(cols))
     }
   })
 }

df.select(flattenSchema(df.schema): _*).printSchema

root
 |-- _1: string (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _1: integer (nullable = true)
 |-- _2: integer (nullable = true)
 |-- col: integer (nullable = false)
person silentshadow    schedule 22.08.2018