Плоская карта на фрейме данных

Каков наилучший способ преформирования flatMap на DataFrame в искре? Поискав вокруг и проведя некоторое тестирование, я придумал два разных подхода. Оба они имеют некоторые недостатки, поэтому я думаю, что должен быть какой-то лучший/более простой способ сделать это.

Первый способ, который я нашел, - сначала преобразовать DataFrame в RDD, а затем обратно:

val map = Map("a" -> List("c","d","e"), "b" -> List("f","g","h"))
val df = List(("a", 1.0), ("b", 2.0)).toDF("x", "y")

val rdd = df.rdd.flatMap{ row =>
  val x = row.getAs[String]("x")
  val x = row.getAs[Double]("y")
  for(v <- map(x)) yield Row(v,y)
}
val df2 = spark.createDataFrame(rdd, df.schema)

Второй подход состоит в том, чтобы создать DataSet перед использованием flatMap (используя те же переменные, что и выше), а затем преобразовать обратно:

val ds = df.as[(String, Double)].flatMap{
  case (x, y) => for(v <- map(x)) yield (v,y)
}.toDF("x", "y")

Оба эти подхода работают достаточно хорошо, когда количество столбцов невелико, однако у меня их намного больше, чем 2 столбца. Есть ли лучший способ решить эту проблему? Желательно таким образом, чтобы не было необходимости в преобразовании.


person Shaido    schedule 16.05.2017    source источник
comment
Возможный дубликат Как использовать Spark SQL DataFrame с flatMap?   -  person stefanobaghino    schedule 16.05.2017


Ответы (1)


Вы можете создать второй dataframe из map RDD:

val mapDF = Map("a" -> List("c","d","e"), "b" -> List("f","g","h")).toList.toDF("key", "value")

Затем выполните join и примените функцию explode:

val joinedDF = df.join(mapDF, df("x") === mapDF("key"), "inner")
  .select("value", "y")
  .withColumn("value", explode($"value"))

И вы получите решение.

joinedDF.show()
person Ramesh Maharjan    schedule 16.05.2017