Не удалось найти кодировщик для типа, хранящегося в наборе данных, при попытке выполнить flatMap для DataFrame в Spark 2.0

Я продолжаю получать следующую ошибку времени компиляции:

Unable to find encoder for type stored in a Dataset.  
Primitive types (Int, String, etc) and Product types (case classes) 
are supported by importing spark.implicits._  
Support for serializing other types will be added in future releases.

Я только что обновился со Spark v1.6 до v2.0.2, и целая куча кода, использующего DataFrame, жалуется на эту ошибку. Код, в котором он жалуется, выглядит следующим образом.

def doSomething(data: DataFrame): Unit = {
 data.flatMap(row => {
  ...
 })
 .reduceByKey(_ + _)
 .sortByKey(ascending = false)
}

Предыдущие сообщения SO предлагают

Однако у меня нет классов case, так как я использую DataFrame, который равен DataSet[Row], а также я встроил 2 неявных импорта следующим образом, без какой-либо помощи, чтобы избавиться от этого сообщения.

val sparkSession: SparkSession = ???
val sqlContext: SQLContext = ???

import sparkSession.implicits._
import sqlContext.implicits._

Обратите внимание, что я просмотрел документацию для DataSet и Кодировщик. Документы говорят что-то вроде следующего.

Scala

Encoders are generally created automatically through implicits from a 
SparkSession, or can be explicitly created by calling static methods on 
Encoders.

import spark.implicits._

val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder)

Однако мой метод не имеет доступа к SparkSession. Кроме того, когда я пробую эту строку import spark.implicits._, IntelliJ даже не может ее найти. Когда я говорю, что мой DataFrame — это DataSet[Row], я действительно это имею в виду.

Этот вопрос помечен как возможный дубликат, но, пожалуйста, позвольте мне уточнить.

  • У меня нет класса case или бизнес-объекта.
  • Я использую .flatMap, а другой вопрос использует .map
  • неявный импорт не помогает
  • передача RowEncoder приводит к ошибке времени компиляции, например. data.flatMap(row => { ... }, RowEncoder(data.schema)) (слишком много аргументов)

Я читаю другие сообщения и позвольте мне добавить, я думаю, я не знаю, как должен работать этот новый Spark 2.0 Datasets/DataFrame API. В оболочке Spark работает приведенный ниже код. Обратите внимание, что я запускаю искровую оболочку следующим образом $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

val schema = StructType(Array(
 StructField("x1", StringType, true),
 StructField("x2", StringType, true),
 StructField("x3", StringType, true),
 StructField("x4", StringType, true),
 StructField("x5", StringType, true)))

val df = sqlContext.read
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .schema(schema)
 .load("/Users/jwayne/Downloads/mydata.csv")

df.columns.map(col => {
 df.groupBy(col)
   .count()
   .map(_.getString(0))
   .collect()
   .toList
 })
 .toList

Однако, когда я запускаю это как часть тестового модуля, я получаю то же самое, что не могу найти ошибку кодировщика. Почему это работает в оболочке, но не в моих тестовых модулях?

В оболочке я набрал :imports и :implicits и поместил их в свои файлы/исходники scala, но это тоже не помогает.


person Jane Wayne    schedule 12.12.2016    source источник
comment
Это совсем другая проблема. Это не дубликат. Вы можете убедиться в этом, когда мы запускаем модульные тесты. Пожалуйста, снимите пометку с дубликата.   -  person kris433    schedule 24.11.2017
comment
Это не должно быть помечено как дубликат.   -  person WestCoastProjects    schedule 25.04.2019