Как наиболее эффективно преобразовать строку Scala DataFrame в класс case?

После того, как я получил в Spark некоторый класс Row, Dataframe или Catalyst, я хочу преобразовать его в класс case в моем коде. Это можно сделать, сопоставив

someRow match {case Row(a:Long,b:String,c:Double) => myCaseClass(a,b,c)}

Но это становится уродливым, когда в строке есть огромное количество столбцов, скажем, дюжина двойных значений, некоторые логические значения и даже иногда ноль.

Я просто хотел бы иметь возможность -извините- привести Row к myCaseClass. Возможно ли это, или у меня уже есть самый экономичный синтаксис?


person arivero    schedule 27.01.2015    source источник
comment
Вероятно, бесформенный (github.com/milessabin/shapeless/wiki/) может помочь уменьшить шаблон, но, вероятно, он не очень любит nulls. Может быть, макросы (если у вас много case-классов)?   -  person Gábor Bakos    schedule 27.01.2015
comment
Макросы никогда не пробовал. Проблема здесь в том, что я верю в стандарты для языков. Я могу представить, что всегда могу использовать свои собственные методы или использовать кого-то еще ... но я предпочитаю попытаться понять, как это делается без каких-либо внешних факторов.   -  person arivero    schedule 27.01.2015
comment
интересно ... возможно, я мог бы создать подкласс myCaseClass от Row?   -  person arivero    schedule 19.07.2015
comment
Это такое разочарование. У меня есть большой сложный класс case, и теперь мне нужно вручную сопоставить с ним каждый столбец, когда я хочу загрузить и поработать с ним. Мне грустно от этого :-(   -  person jbrown    schedule 30.11.2015


Ответы (4)


DataFrame - это просто псевдоним типа Dataset [Row]. Эти операции также называются «нетипизированными преобразованиями» в отличие от «типизированных преобразований», которые идут со строго типизированными наборами данных Scala / Java.

Преобразование из набора данных [строка] в набор данных [человек] очень просто в программе Spark.

val DFtoProcess = SQLContext.sql("SELECT * FROM peoples WHERE name='test'")

На этом этапе Spark преобразует ваши данные в DataFrame = Dataset [Row], коллекцию универсального объекта Row, поскольку ему неизвестен точный тип.

// Create an Encoders for Java class (In my eg. Person is a JAVA class)
// For scala case class you can pass Person without .class reference
val personEncoder = Encoders.bean(Person.class) 

val DStoProcess = DFtoProcess.as[Person](personEncoder)

Теперь Spark преобразует Dataset[Row] -> Dataset[Person] зависящий от типа объект JVM Scala / Java в соответствии с требованиями класса Person.

Пожалуйста, обратитесь к приведенной ниже ссылке, предоставленной databricks для получения дополнительной информации.

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

person Rahul    schedule 20.08.2016
comment
У вас есть только один ответ - но в любом случае он хороший! Я не мог найти никакой информации о том, как создать собственный кодировщик Spark, пока не наткнулся на этот ответ. кстати scala способ Encoders.bean[Person] - person WestCoastProjects; 21.09.2017
comment
Следует отметить, что as небезопасно, так как не проверяет правильность преобразования. Я не понимаю, как они добавили такую ​​уродливую функцию в свой API (ее следует называть unsafeAs - а где безопасный as, который одновременно выполняет фильтрацию или возвращает DataSet[Option[T]]?) - person user239558; 16.04.2018
comment
@javadba - я думаю, это не работает с классами case? Я получаю здесь ошибку Cannot infer type for class Person because it is not bean-compliant. - person Sasgorilla; 07.05.2018
comment
Это, безусловно, самое чистое решение, когда я хочу преобразовать Dataset[Row] в Dataset[Person], но что, если я просто хочу преобразовать один объект Row? Я возвращаюсь к тому, что просто вручную создаю Person из каждого поля в Row. Есть ли способ лучше? - person Sasgorilla; 07.05.2018
comment
@sasgorilla правильный ответ для классов case ниже - вам нужно import spark.implicits._, тогда вы можете просто использовать df.as[Person] - person Mark Butler; 15.08.2018
comment
Encoders.bean api предназначен для java, который не может использовать имплицит. Обратите внимание, что он следует стандартному дизайну java и ожидает, что классы будут изменяемыми pojo :( - person Juh_; 29.03.2021

Насколько я знаю, вы не можете преобразовать строку в класс case, но иногда я выбирал прямой доступ к полям строки, например

map(row => myCaseClass(row.getLong(0), row.getString(1), row.getDouble(2))

Я считаю, что это проще, особенно если конструктору класса case нужны только некоторые поля из строки.

person Glennie Helles Sindholt    schedule 02.09.2015
comment
И вы избегаете проблемы сопоставления java-нулей :-) - person arivero; 03.09.2015
comment
Мне тоже нравится это представление для меньшего набора столбцов, но если набор столбцов больше, что добавляет двусмысленности, я думаю, что предложение @Gianmarios может быть более расширяемым. Мне нужно самому проверить пару вещей. Вернемся к вам по этому поводу. - person Pramit; 12.10.2016
comment
если некоторые поля в случае класса являются общими, это сработает? - person cozyss; 27.02.2018
comment
Спасибо! Это было большим подспорьем. - person bit_flip; 11.04.2019

scala> import spark.implicits._    
scala> val df = Seq((1, "james"), (2, "tony")).toDF("id", "name")
df: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> case class Student(id: Int, name: String)
defined class Student

scala> df.as[Student].collectAsList
res6: java.util.List[Student] = [Student(1,james), Student(2,tony)]

Здесь spark в spark.implicits._ - это ваш SparkSession. Если вы находитесь внутри REPL, сеанс уже определен как spark, в противном случае вам необходимо изменить имя, чтобы оно соответствовало вашему SparkSession.

person secfree    schedule 25.12.2017
comment
Для Spark 2.1.0 мне пришлось импортировать spark.implicits._, чтобы получить эту работу - красивое, элегантное решение для Scala. - person Darth Jon; 27.02.2018
comment
См. stackoverflow.com/questions/39968707/ для получения дополнительной информации о spark.implicits._ - person Mark Butler; 15.08.2018
comment
Я делаю это вне REPL и во время теста, и там написано Error:(44, 30) not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[Student])org.apache.spark.sql.Dataset[Student]. Unspecified value parameter evidence$2. val rows = df.as[Student].collectAsList() - person NateH06; 26.06.2019

Конечно, вы можете сопоставить объект Row с классом case. Предположим, ваш SchemaType имеет много полей, и вы хотите сопоставить некоторые из них с вашим классом case. Если у вас нет нулевых полей, вы можете просто сделать:

case class MyClass(a: Long, b: String, c: Int, d: String, e: String)

dataframe.map {
  case Row(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(a = a.longValue(), b = b, c = c, d = d.toString, e = e.toString)
}

Этот подход не работает в случае нулевых значений, а также требует, чтобы вы явно определяли тип каждого отдельного поля. Если вам нужно обрабатывать нулевые значения, вам нужно либо отбросить все строки, содержащие нулевые значения, выполнив

dataframe.na.drop()

Это приведет к удалению записей, даже если пустые поля не те, которые используются в вашем сопоставлении с шаблоном для вашего класса case. Или, если вы хотите справиться с этим, вы можете превратить объект Row в список, а затем использовать шаблон параметров:

case class MyClass(a: Long, b: String, c: Option[Int], d: String, e: String)

dataframe.map(_.toSeq.toList match {
  case List(a: java.math.BigDecimal, 
    b: String, 
    c: Int, 
    _: String,
    _: java.sql.Date, 
    e: java.sql.Date,
    _: java.sql.Timestamp, 
    _: java.sql.Timestamp, 
    _: java.math.BigDecimal, 
    _: String) => MyClass(
      a = a.longValue(), b = b, c = Option(c), d = d.toString, e = e.toString)
}

Проверьте этот проект на github Sparkz (), который вскоре представит множество библиотек для упрощения API-интерфейсов Spark и DataFrame и сделает их более ориентированными на функциональное программирование.

person Gianmario Spacagna    schedule 02.11.2015
comment
Кто автор Sparkz (), о котором вы говорите? - person Chetan Bhasin; 27.11.2015
comment
Предположительно он находится по адресу github.com/gm-spacagna/sparkz, но на данный момент он пуст. - person jbrown; 30.11.2015