У меня есть образ Vagrant с запущенными Spark Notebook, Spark, Accumulo 1.6 и Hadoop. Из записной книжки я могу вручную создать сканер и извлечь тестовые данные из таблицы, которую я создал, используя один из примеров Accumulo:
val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)
scanner.setRange(new Range("row_0000000000", "row_0000000010"))
for(entry: Entry[Key, Value] <- scanner) {
println(entry.getKey + " is " + entry.getValue)
}
даст первые десять строк данных таблицы.
Когда я пытаюсь создать RDD таким образом:
val rdd2 =
sparkContext.newAPIHadoopRDD (
new Configuration(),
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
Я получаю возвращенный мне RDD, с которым я ничего не могу сделать из-за следующей ошибки:
java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) на org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) на org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) в org.apache.spark.rdd. RDD.count(RDD.scala:927)
Это вполне логично в свете того факта, что я не указал никаких параметров относительно того, к какой таблице подключаться, каковы авторизации и т. д.
Итак, мой вопрос: Что мне нужно сделать, чтобы получить эти первые десять строк данных таблицы в мой RDD?
обновить Все еще не работает, но я обнаружил несколько вещей. Оказывается, есть два почти одинаковых пакета,
org.apache.accumulo.core.client.mapreduce
&
org.apache.accumulo.core.client.mapred
оба имеют почти идентичные члены, за исключением того факта, что некоторые сигнатуры методов различны. не уверен, почему оба существуют, поскольку я не видел уведомления об устаревании. Я без радости попытался реализовать ответ Ситсе. Ниже то, что я сделал, и ответы:
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Конфигурация: core-default.xml, core-site.xml, mapred-default. xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
Конфигурация: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
val rdd2 =
sparkContext.hadoopRDD (
jobConf,
classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value],
1
)
rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] в hadoopRDD в: 62
rdd2.first
java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) на org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) на org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.rdd.RDD.take(RDD.scala:1077) в org.apache.spark. rdd.RDD.first(RDD.scala:1110) в $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) в $iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$i wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) в...
* изменить 2 *
re: Ответ Холдена - до сих пор без радости:
AbstractInputFormat.setConnectorInfo(jobConf,
"root",
new PasswordToken("password")
AbstractInputFormat.setScanAuthorizations(jobConf, auths)
AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
InputFormatBase.setInputTableName(jobConf, "batchtest1")
val rddX = sparkContext.newAPIHadoopRDD(
jobConf,
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat],
classOf[org.apache.accumulo.core.data.Key],
classOf[org.apache.accumulo.core.data.Value]
)
rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] в newAPIHadoopRDD в: 58
Out[15]: NewHadoopRDD[0] в newAPIHadoopRDD в :58
rddX.first
java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) на org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) на org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.rdd.RDD.take(RDD.scala:1077) в org.apache.spark. rdd.RDD.first(RDD.scala:1110) в $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) в
изменить 3 – прогресс!
я смог выяснить, почему возникает ошибка «input INFO not set». зоркие среди вас, несомненно, увидят, что в следующем коде отсутствует закрывающий '('
AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")
когда я делаю это в блокноте spark, я нажимал кнопку «Выполнить» и двигался дальше, потому что не видел ошибки. что я забыл, так это то, что блокнот будет делать то же, что и spark-shell, когда вы опускаете закрывающее ')' - он будет вечно ждать, пока вы его добавите. поэтому ошибка была результатом того, что метод setConnectorInfo никогда не выполнялся.
к сожалению, я все еще не могу засунуть данные таблицы накопления в RDD, который мне пригоден. когда я выполняю
rddX.count
Я вернусь
res15: Длинный = 10000
что является правильным ответом - в таблице, на которую я указал, 10 000 строк данных. однако, когда я пытаюсь захватить первый элемент данных таким образом:
rddX.first
Я получаю следующую ошибку:
org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0.0 на этапе 0.0 (TID 0) имела несериализуемый результат: org.apache.accumulo.core.data.Key
любые мысли о том, куда идти отсюда?
изменить 4 – успешно!
принятый ответ + комментарии составляют 90% пути, за исключением того факта, что ключ/значение аккумуляции необходимо преобразовать во что-то сериализуемое. я получил это, вызвав метод .toString() для обоих. Я постараюсь вскоре опубликовать что-нибудь с полным рабочим кодом, если кто-то еще столкнется с той же проблемой.