Ошибка в Dataset.filter в Spark SQL

Я хочу отфильтровать набор данных только для того, чтобы содержать запись, которую можно найти в MySQL.

Вот набор данных:

dataset.show()
+---+-----+
| id| name|
+---+-----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+-----+

А вот таблица в MySQL:

+---+-----+
| id| name|
+---+-----+
|  1|    a|
|  3|    c|
|  4|    d|
+---+-----+

Это мой код (работает в искровой оболочке):

import java.util.Properties

case class App(id: Int, name: String)

val data = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c")))
val dataFrame = data.map { case (id, name) => App(id, name) }.toDF
val dataset = dataFrame.as[App]

val url = "jdbc:mysql://ip:port/tbl_name"
val table = "my_tbl_name"
val user = "my_user_name"
val password = "my_password"

val properties = new Properties()
properties.setProperty("user", user)
properties.setProperty("password", password)

dataset.filter((x: App) => 
  0 != sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count).show()

Но я получаю "java.lang.NullPointerException"

at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:638)
    at org.apache.spark.sql.SQLConf.defaultDataSourceName(SQLConf.scala:558)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:362)
    at org.apache.spark.sql.SQLContext.read(SQLContext.scala:623)

я тестировал

val x = App(1, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + x.id.toString), properties).count

val y = App(5, "aa")
sqlContext.read.jdbc(url, table, Array("id = " + y.id.toString), properties).count

и я могу получить правильный результат 1 и 0.

В чем проблема с фильтром?


person Xiaoyu Chen    schedule 15.01.2016    source источник


Ответы (1)


В чем проблема с фильтром?

Вы получаете исключение, потому что пытаетесь выполнить действие (count над DataFrame) внутри преобразования (filter). В Spark не поддерживаются ни вложенные действия, ни преобразования.

Правильное решение, как обычно, либо join в совместимых структурах данных, либо поиск с использованием локальной структуры данных, либо запрос непосредственно во внешней системе (без использования структур данных Spark).

person zero323    schedule 15.01.2016