Spark UDF как параметр функции, UDF не входит в область действия функции

У меня есть несколько UDF, которые я хотел бы передать в качестве аргумента функции вместе с фреймами данных.

Одним из способов сделать это может быть создание пользовательской функции внутри функции, но это создаст и уничтожит несколько экземпляров пользовательской функции без ее повторного использования, что может оказаться не лучшим способом решения этой проблемы.

Вот пример кода -

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

val df =   inputDF1
    .withColumn("new_col", lkpUDF(col("c1")))
val df2 =   inputDF2.
  .withColumn("new_col", lkpUDF(col("c1")))

Вместо того, чтобы делать вышеперечисленное, я бы в идеале хотел сделать что-то вроде этого:

val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}

def appendCols(df: DataFrame, lkpUDF: ?): DataFrame = {

    df
      .withColumn("new_col", lkpUDF(col("c1")))

  }
val df = appendCols(inputDF, lkpUDF)

Приведенный выше UDF довольно прост, но в моем случае он может возвращать примитивный тип или определяемый пользователем тип класса case. Любые мысли / указатели будут высоко оценены. Спасибо.


person Yash    schedule 08.02.2017    source источник


Ответы (1)


Ваша функция с соответствующей подписью должна быть такой:

import org.apache.spark.sql.UserDefinedFunction

def appendCols(df: DataFrame, func: UserDefinedFunction): DataFrame = {
    df.withColumn("new_col", func(col("col1")))
}

REPL scala весьма полезен для возврата типа инициализированных значений.

scala> val lkpUDF = udf{(i: Int) => if (i > 0) 1 else 0}
lkpUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,List(IntegerType))

Кроме того, если сигнатура функции, которую вы передаете в оболочку udf, состоит из возвращаемого типа Any (что будет иметь место, если функция может возвращать либо примитивный, либо определенный пользователем класс case), пользовательская функция не сможет скомпилироваться с такое исключение:

java.lang.UnsupportedOperationException: Schema for type Any is not supported
person septra    schedule 08.02.2017
comment
Спасибо, септра. Вы правы, что я столкнулся с вышеуказанной ошибкой. Но я, кажется, сталкиваюсь с этой ошибкой, даже когда пытаюсь вернуть только класс case. stackoverflow.com/questions/42121649 / - person Yash; 09.02.2017