Я продолжаю получать следующую ошибку времени компиляции:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Я только что обновился со Spark v1.6 до v2.0.2, и целая куча кода, использующего DataFrame
, жалуется на эту ошибку. Код, в котором он жалуется, выглядит следующим образом.
def doSomething(data: DataFrame): Unit = {
data.flatMap(row => {
...
})
.reduceByKey(_ + _)
.sortByKey(ascending = false)
}
Предыдущие сообщения SO предлагают
Однако у меня нет классов case, так как я использую DataFrame
, который равен DataSet[Row]
, а также я встроил 2 неявных импорта следующим образом, без какой-либо помощи, чтобы избавиться от этого сообщения.
val sparkSession: SparkSession = ???
val sqlContext: SQLContext = ???
import sparkSession.implicits._
import sqlContext.implicits._
Обратите внимание, что я просмотрел документацию для DataSet и Кодировщик. Документы говорят что-то вроде следующего.
Scala Encoders are generally created automatically through implicits from a SparkSession, or can be explicitly created by calling static methods on Encoders. import spark.implicits._ val ds = Seq(1, 2, 3).toDS() // implicitly provided (spark.implicits.newIntEncoder)
Однако мой метод не имеет доступа к SparkSession
. Кроме того, когда я пробую эту строку import spark.implicits._
, IntelliJ даже не может ее найти. Когда я говорю, что мой DataFrame — это DataSet[Row], я действительно это имею в виду.
Этот вопрос помечен как возможный дубликат, но, пожалуйста, позвольте мне уточнить.
- У меня нет класса case или бизнес-объекта.
- Я использую .flatMap, а другой вопрос использует .map
- неявный импорт не помогает
- передача RowEncoder приводит к ошибке времени компиляции, например.
data.flatMap(row => { ... }, RowEncoder(data.schema))
(слишком много аргументов)
Я читаю другие сообщения и позвольте мне добавить, я думаю, я не знаю, как должен работать этот новый Spark 2.0 Datasets/DataFrame API. В оболочке Spark работает приведенный ниже код. Обратите внимание, что я запускаю искровую оболочку следующим образом $SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val schema = StructType(Array(
StructField("x1", StringType, true),
StructField("x2", StringType, true),
StructField("x3", StringType, true),
StructField("x4", StringType, true),
StructField("x5", StringType, true)))
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.schema(schema)
.load("/Users/jwayne/Downloads/mydata.csv")
df.columns.map(col => {
df.groupBy(col)
.count()
.map(_.getString(0))
.collect()
.toList
})
.toList
Однако, когда я запускаю это как часть тестового модуля, я получаю то же самое, что не могу найти ошибку кодировщика. Почему это работает в оболочке, но не в моих тестовых модулях?
В оболочке я набрал :imports
и :implicits
и поместил их в свои файлы/исходники scala, но это тоже не помогает.