Запрос данных Cassandra с использованием Spark SQL в Scala

Я пытаюсь запросить данные Cassandra с помощью Spark SQL в Scala.

    import com.datastax.spark.connector._  
    import org.apache.spark.SparkContext 
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf

    val conf= new SparkConf(true)
    .set("spark.cassandra.connection.host","**.*.**.***")
    .set("spark.cassandra.auth.username","****")
    .set("spark.cassandra.auth.password","****")   
    val sc = new SparkContext(conf)

    import org.apache.spark.sql._
    val sqlContext = new SQLContext(sc)
    sqlContext.sql("SELECT * FROM energydata.demodata")  

И это выдает ошибку:

org.apache.spark.sql.AnalysisException: таблица или представление не найдены: _2 _._ 3_; строка 1 поз. 14; 'Project [*] + -' UnresolvedRelation _4 _._ 5_

на org.apache.spark.sql.catalyst.analysis.package $ AnalysisErrorAt.failAnalysis (package.scala: 42) на org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply (CheckAnalysis. scala: 82) на org.apache.spark.sql.catalyst.analysis.CheckAnalysis $$ anonfun $ checkAnalysis $ 1.apply (CheckAnalysis.scala: 78) на org.apache.spark.sql.catalyst.trees.TreeNode.achUp ( TreeNode.scala: 127) на org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ foreachUp $ 1.apply (Tre eNode.scala: 126) на org.apache.spark.sql.catalyst.trees.TreeNode $$ anonfun $ foreachUp $ 1.применить (Tre eNode.scala: 126) в scala.collection.immutable.List.foreach (List.scala: 381) в org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp ( TreeNode.scala: 126) на org.apache.spark.sql.catalyst.analysis.CheckAnalysis $ class.checkAnalysis (Ch eckAnalysis.scala: 78) на org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis (Analyzer .scal a: 91) в org.apache.spark.sql.execu. tion.QueryExecution.assertAnalyzed (QueryExecution .scala: 52) в org.apache.spark.sql.Dataset $ .ofRows (Dataset.scala: 66) в org.apache.spark.sql.SparkSession.sql (SparkSession.scala: 623 ) в org.apache.spark.sql.SQLContext.sql (SQLContext.scala: 691) ... 54 исключено

Я просто хочу читать данные таблицы, не нарушая таблицу cassandra. Я попробовал это решение, указанное ​​здесь, чтобы добавить hive-site.xml файл в spark/conf. Но когда я добавляю это в spark/conf, кажется, что искра не работает должным образом.

at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
        at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.appl
y(SparkSession.scala:938)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.sc
ala:938)
        at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
        at $line3.$read$$iw$$iw.<init>(<console>:15)
        at $line3.$read$$iw.<init>(<console>:42)
        at $line3.$read.<init>(<console>:44)
        at $line3.$read$.<init>(<console>:48)
        at $line3.$read$.<clinit>(<console>)
        at $line3.$eval$.$print$lzycompute(<console>:7)
        at $line3.$eval$.$print(<console>:6)
        at $line3.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)

        at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047
)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:638)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunR
eq$1.apply(IMain.scala:637)
        at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaCla
ssLoader.scala:31)
        at scala.reflect.internal.util.AbstractFileClassLoader.asContext(Abstrac
tFileClassLoader.scala:19)
        at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.
scala:637)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
        at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
        at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:8
07)
        at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
        at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV
$sp(SparkILoop.scala:38)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
        at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(Spa
rkILoop.scala:37)
        at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)
        at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)

        at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILo
op.scala:920)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
        at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scal
a:909)
        at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(Sca
laClassLoader.scala:97)
        at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
        at org.apache.spark.repl.Main$.doMain(Main.scala:70)
        at org.apache.spark.repl.Main$.main(Main.scala:53)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSub
mit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:18
0)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1523)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(Retry
ingMetaStoreClient.java:86)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:132)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(Ret
ryingMetaStoreClient.java:104)
        at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.ja
va:3005)
        at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
        at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:123
4)
        ... 87 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

        at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)

        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou
rce)
        at java.lang.reflect.Constructor.newInstance(Unknown Source)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStore
Utils.java:1521)
        ... 93 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
        at org.apache.hadoop.fs.Path.initialize(Path.java:205)
        at org.apache.hadoop.fs.Path.<init>(Path.java:196)
        at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
141)
        at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:
146)
        at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:1
59)
        at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(War
ehouse.java:177)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB_core(HiveMetaStore.java:600)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefau
ltDB(HiveMetaStore.java:620)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMe
taStore.java:461)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHM
SHandler.java:66)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(Retrying
HMSHandler.java:72)
        at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(
HiveMetaStore.java:5762)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaS
toreClient.java:199)
        at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(
SessionHiveMetaStoreClient.java:74)
        ... 98 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
        at java.net.URI.checkPath(Unknown Source)
        at java.net.URI.<init>(Unknown Source)
        at org.apache.hadoop.fs.Path.initialize(Path.java:202)
        ... 111 more
17/07/26 11:40:06 WARN ObjectStore: Failed to get database default, returning No
SuchObjectException
java.lang.IllegalArgumentException: Error while instantiating 'org.apache.spark.
sql.hive.HiveSessionStateBuilder':
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1053)
  at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
  at org.apache.spark.sql.SparkSession$$anonfun$sessionState$2.apply(SparkSessio
n.scala:130)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scal
a:129)
  at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
  at org.apache.spark.sql.SparkSession$Builder$$anonfun$getOrCreate$5.apply(Spar
kSession.scala:938)
  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

  at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)

  at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
  at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
  at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:93
8)
  at org.apache.spark.repl.Main$.createSparkSession(Main.scala:97)
  ... 47 elided
Caused by: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: j
ava.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metad
ata.SessionHiveMetaStoreClient;
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:106)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCa
talog.scala:193)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(Shared
State.scala:105)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala
:93)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.externalCatalog(HiveSessi
onStateBuilder.scala:39)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog$lzycompute(HiveSe
ssionStateBuilder.scala:54)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:52)
  at org.apache.spark.sql.hive.HiveSessionStateBuilder.catalog(HiveSessionStateB
uilder.scala:35)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStat
eBuilder.scala:289)
  at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$insta
ntiateSessionState(SparkSession.scala:1050)
  ... 61 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Unable to ins
tantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

  at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala
:191)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
  at java.lang.reflect.Constructor.newInstance(Unknown Source)
  at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(Isolated
ClientLoader.scala:264)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:3
62)
  at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:2
66)
  at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExterna
lCatalog.scala:66)
  at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.sc
ala:65)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly$mcZ$sp(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.app
ly(HiveExternalCatalog.scala:194)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalo
g.scala:97)
  ... 70 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.h
ive.ql.metadata.SessionHiveMetaStoreClient
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1523)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMet
aStoreClient.java:86)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:132)
  at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingM
etaStoreClient.java:104)
  at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:300
5)
  at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
  at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)

  ... 84 more
Caused by: java.lang.reflect.InvocationTargetException: java.lang.IllegalArgumen
tException: java.net.URISyntaxException: Relative path in absolute URI: file:$%7
Btest.warehouse.dir%7D
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
  at java.lang.reflect.Constructor.newInstance(Unknown Source)
  at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.
java:1521)
  ... 90 more
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Rela
tive path in absolute URI: file:$%7Btest.warehouse.dir%7D
  at org.apache.hadoop.fs.Path.initialize(Path.java:205)
  at org.apache.hadoop.fs.Path.<init>(Path.java:196)
  at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:141)
  at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:146)
  at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)
  at org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse
.java:177)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_c
ore(HiveMetaStore.java:600)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(H
iveMetaStore.java:620)
  at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStor
e.java:461)
  at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandl
er.java:66)
  at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHan
dler.java:72)
  at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMe
taStore.java:5762)
  at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreCl
ient.java:199)
  at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.<init>(Sessio
nHiveMetaStoreClient.java:74)
  ... 95 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: file:$%7B
test.warehouse.dir%7D
  at java.net.URI.checkPath(Unknown Source)
  at java.net.URI.<init>(Unknown Source)
  at org.apache.hadoop.fs.Path.initialize(Path.java:202)
  ... 108 more
<console>:14: error: not found: value spark
       import spark.implicits._
              ^
<console>:14: error: not found: value spark
       import spark.sql
              ^
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_141)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Я использую версии scala 2.12.2, Java 1.8.0, cassandra 3.1.1. Есть ли другой способ написать SQL-запрос в Scala?

Спасибо.


person dhinar    schedule 26.07.2017    source источник


Ответы (1)


Из импорта я понимаю, что вы используете spark-cassandra-connector. В разделе совместимость версий они упомянули, что коннектор поддерживает Scala. 2.10, 2.11 и Cassandra 2.1.5 *, 2.2, 3.0 с Spark 2.0, 2.1 с последней версией коннектора.

Поэтому я предлагаю вам перейти на более раннюю версию scala и cassandra и проверить, работает ли это.

Затем я предлагаю вам изменить способ доступа к таблицам. Datastax предоставил вам другой API для подключения к Cassandra. Соответствующую документацию можно найти здесь.

Вы можете сделать что-то подобное с Spark 2.x,

val spark = SparkSession.builder()
.appName("CassTest")
.master("local[2]")
.config("spark.cassandra.connection.host","**.*.**.***")
.config("spark.cassandra.auth.username","****")
.config("spark.cassandra.auth.password","****") 
.getOrCreate()

import spark.implicits._

val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "words", "keyspace" -> "test" ))
  .load()

Наконец, вы можете сделать df.show
ПРИМЕЧАНИЕ. hive-site.xml исправление, которое вы пробовали, - это подключить Hive к некоторому глобально доступному хранилищу метаданных, которое само по себе является другим хранилищем данных. Так что для Кассандры это не сработает.

Сообщите мне, помогло ли это. Ваше здоровье.

person Chitral Verma    schedule 26.07.2017
comment
привет большое спасибо за комментарий. но я не должен понижать версию cassandra. Я попробовал приведенный выше код, и он дает ошибку, когда я даю spark.implicites._ <console>:36: error: value implicits is not a member of org.apache.spark.sql.SparkSession.Builder и val df = spark.sql (SELECT * FROM energydata.demodata) <console>:36: error: value sql is not a member of org.apache.spark.sql.SparkSess ion.Builder - person dhinar; 26.07.2017
comment
spark вот экземпляр org.apache.spark.sql.SparkSession, созданный вами выше. Можете ли вы убедиться, что вы импортируете implicits на этом экземпляре. Это будет работать для Spark 2.x. Также вы можете опустить эту подразумевающую строку и использовать другой способ обращения к таблице, которую я упомянул в своем редактировании выше, с использованием API фрейма данных. - person Chitral Verma; 26.07.2017
comment
он показывает ту же ошибку: <console>:39: error: value read is not a member of org.apache.spark.sql.SparkSession.Builder . Я не знаю, почему он это показывает. - person dhinar; 26.07.2017
comment
вы можете опубликовать свой pom.xml или build.sbt (в случае sbt) - person Chitral Verma; 26.07.2017
comment
Извините, я не знаю, где взять этот buld.sbt. Я совершенно новичок в Spark. но версия sbt, которую я здесь использую, это sbt 0.13.15 - person dhinar; 26.07.2017
comment
Вы где-то упомянули искровые зависимости? Пожалуйста, опубликуйте их. - person Chitral Verma; 26.07.2017
comment
Давайте продолжим это обсуждение в чате. - person Chitral Verma; 26.07.2017