Не сериализуемое исключение при запуске линейной регрессии scala 2.12

При запуске следующей искровой мллибы в локальном режиме с помощью scala 2.12.3 возникла следующая ошибка: лямбда не сериализуема

Любые материалы будут высоко оценены? (Переход на scala 2.11 для меня не вариант) Не могли бы вы сообщить мне, что я могу сделать, чтобы избежать этой проблемы? Спасибо

import java.io.FileWriter

import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.TimestampType

import java.util.concurrent.atomic.AtomicBoolean


object MLAnalyzer {

  val conf = new SparkConf().setMaster("local[2]").set("deploy-mode", "client").set("spark.driver.bindAddress", "127.0.0.1")
        .set("spark.broadcast.compress", "false")
        .setAppName("local-spark-kafka-consumer-client")

      val spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate()
  def main(args: Array[String]): Unit = {
    process
  }


  def process():Unit= {


      // training data
      val filePath = "/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/train_pooling.csv"
      val modelPath = "file:///home/vagrant/Downloads/medium-articles-master/titanic_spark/training_batch/src/main/resources/poolSessionModelRecent.model"

      val schema = StructType(
        Array(
          StructField("PACKAGE_KEY", StringType),
          StructField("MOST_IDLE", IntegerType),
          StructField("MAX_WAIT", IntegerType),
          StructField("IDLE_COUNT", IntegerType),
          StructField("APPLICATION", StringType),
          StructField("LONGEST_WAIT", IntegerType),
          StructField("TIMEOUTS", IntegerType),
          StructField("LAST_ACCESS", TimestampType),
          StructField("MOST_ACTIVE", IntegerType),
          StructField("MAX_ACTIVE", IntegerType),
          StructField("MAX_IDLE", IntegerType),
          StructField("ACTIVE_COUNT", IntegerType),
          StructField("FACTOR_LOAD", DoubleType)))

          while (true) {
            Thread.sleep(100)
      // read the raw data
      var df_raw = spark
        .read
        .option("header", "true")
        //      .option("inferSchema","true")
        .schema(schema)
        .csv(filePath)

      df_raw = df_raw.drop(df_raw.col("PACKAGE_KEY"))
      df_raw = df_raw.drop(df_raw.col("MOST_IDLE"))
      df_raw = df_raw.drop(df_raw.col("MAX_IDLE"))
      df_raw = df_raw.drop(df_raw.col("MOST_ACTIVE"))
      df_raw = df_raw.drop(df_raw.col("LAST_ACCESS"))
      df_raw = df_raw.drop(df_raw.col("APPLICATION"))
      df_raw = df_raw.drop(df_raw.col("MAX_WAIT"))


      // fill all na values with 0
      val df = df_raw.na.fill(0)
      val packageKeyIndexer = new StringIndexer()
        .setInputCol("PACKAGE_KEY")
        .setOutputCol("PackageIndex")
        .setHandleInvalid("keep")

      // create the feature vector
      val vectorAssembler = new VectorAssembler()
        .setInputCols(Array("IDLE_COUNT", "TIMEOUTS", "ACTIVE_COUNT" /*, "TOTAL_REQUEST_COUNT"*/ ))
        .setOutputCol("features_intermediate")


      import org.apache.spark.ml.feature.StandardScaler
      val scaler = new StandardScaler().setWithMean(true).setWithStd(true).setInputCol("features_intermediate").setOutputCol("features")

      var pipeline: Pipeline = null
      //    if (lr1 == null) {
      val lr =
        new LinearRegression()
          .setMaxIter(100)
          .setRegParam(0.1)
          .setElasticNetParam(0.8)
          //.setFeaturesCol("features")   // setting features column
          .setLabelCol("FACTOR_LOAD") // setting label column
      // create the pipeline with the steps
      pipeline = new Pipeline().setStages(Array( /*genderIndexer, cabinIndexer, embarkedIndexer,*/ vectorAssembler, scaler, lr))


      // create the model following the pipeline steps
      val cvModel = pipeline.fit(df)

      // save the model
      cvModel.write.overwrite.save(modelPath)

      var testschema = StructType(
        Array(
          //        StructField("PACKAGE_KEY", StringType),
          StructField("IDLE_COUNT", IntegerType),
          StructField("TIMEOUTS", IntegerType),
          StructField("ACTIVE_COUNT", IntegerType)))

      val df_raw1 = spark
        .read
        //      .option("header", "true")
        .schema(testschema)
        .csv("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/test_pooling.csv")

      // fill all na values with 0
      val df1 = df_raw1.na.fill(0)

      val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("prediction")
      var rmse = evaluator.evaluate(cvModel.transform(df1))
      import org.apache.spark.sql.functions._
      import spark.implicits._
      val extracted = cvModel.transform(df1)

      val prediction = extracted.select("prediction").map(r => r(0).asInstanceOf[Double]).collect()
      if (prediction != null && prediction.length > 0) {
        val avg = prediction.sum / prediction.length
        val pw: FileWriter = new FileWriter("/home/vagrant/Desktop/Workspaces/SparkMachineLearning/sparkML/src/main/resources/result.csv");
        pw.append(avg.toString)
        pw.flush()
        pw.close()
        println("completed modelling process")
      } else {
        //do nothing
      }

          }


  }
}

дает мне следующую ошибку

Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2280/878458383@65af23c0)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))))
    - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
    - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(named_struct(IDLE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(IDLE_COUNT#1732, 0) as double), TIMEOUTS_double_vecAssembler_bc4ee3d99e56, cast(coalesce(TIMEOUTS#1735, 0) as double), ACTIVE_COUNT_double_vecAssembler_bc4ee3d99e56, cast(coalesce(ACTIVE_COUNT#1740, 0) as double))) AS features_intermediate#1839)
    - element of array (index: 0)

person Mozhi    schedule 05.05.2019    source источник


Ответы (1)


Обновление до Scala 2.12.8 решило проблему. Хотя не уверен в первопричине.

person Mozhi    schedule 05.05.2019
comment
Причина обновления работает из-за Serializable. Если вы читали класс LazyRef в версии ниже 2.12.8, LazyRef не расширяется с помощью Serializable, однако в scala 2.12.8 класс LazyRef расширяется с помощью Serializable. - person mubeen; 25.01.2020
comment
я использую 2.12.11 все еще получаю ту же ошибку. - person umeshfadadu; 11.05.2021