Как создать Spark RDD из Accumulo 1.6 в spark-notebook?

У меня есть образ Vagrant с запущенными Spark Notebook, Spark, Accumulo 1.6 и Hadoop. Из записной книжки я могу вручную создать сканер и извлечь тестовые данные из таблицы, которую я создал, используя один из примеров Accumulo:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_0000000000", "row_0000000010"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

даст первые десять строк данных таблицы.

Когда я пытаюсь создать RDD таким образом:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(), 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value]
  )

Я получаю возвращенный мне RDD, с которым я ничего не могу сделать из-за следующей ошибки:

java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) на org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) на org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) в org.apache.spark.rdd. RDD.count(RDD.scala:927)

Это вполне логично в свете того факта, что я не указал никаких параметров относительно того, к какой таблице подключаться, каковы авторизации и т. д.

Итак, мой вопрос: Что мне нужно сделать, чтобы получить эти первые десять строк данных таблицы в мой RDD?

обновить Все еще не работает, но я обнаружил несколько вещей. Оказывается, есть два почти одинаковых пакета,

org.apache.accumulo.core.client.mapreduce

&

org.apache.accumulo.core.client.mapred

оба имеют почти идентичные члены, за исключением того факта, что некоторые сигнатуры методов различны. не уверен, почему оба существуют, поскольку я не видел уведомления об устаревании. Я без радости попытался реализовать ответ Ситсе. Ниже то, что я сделал, и ответы:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.conf.Configuration jobConf: org.apache.hadoop.mapred.JobConf = Конфигурация: core-default.xml, core-site.xml, mapred-default. xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

Конфигурация: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf, 
                                     "root", 
                                     new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf, auths)

AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf, 
    classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value], 
    1
  )

rdd2: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = HadoopRDD[1] в hadoopRDD в: 62

rdd2.first

java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapred.AbstractInputFormat.validateOptions(AbstractInputFormat.java:308) на org.apache.accumulo.core.client.mapred.AbstractInputFormat.getSplits(AbstractInputFormat.java:505) на org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.rdd.RDD.take(RDD.scala:1077) в org.apache.spark. rdd.RDD.first(RDD.scala:1110) в $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64) в $iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$i wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69) в...

* изменить 2 *

re: Ответ Холдена - до сих пор без радости:

    AbstractInputFormat.setConnectorInfo(jobConf, 
                                         "root", 
                                         new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf, auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf, "batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf, 
      classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
      classOf[org.apache.accumulo.core.data.Key], 
      classOf[org.apache.accumulo.core.data.Value]
      )

rddX: org.apache.spark.rdd.RDD[(org.apache.accumulo.core.data.Key, org.apache.accumulo.core.data.Value)] = NewHadoopRDD[0] в newAPIHadoopRDD в: 58

Out[15]: NewHadoopRDD[0] в newAPIHadoopRDD в :58

rddX.first

java.io.IOException: входная информация не была установлена. в org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator.validateOptions(InputConfigurator.java:630) в org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.validateOptions(AbstractInputFormat.java:343) на org.apache.accumulo.core.client.mapreduce.AbstractInputFormat.getSplits(AbstractInputFormat.java:538) на org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:98) на org.apache.spark.rdd .RDD$$anonfun$partitions$2.apply(RDD.scala:222) в org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) в scala.Option.getOrElse(Option .scala:120) в org.apache.spark.rdd.RDD.partitions(RDD.scala:220) в org.apache.spark.rdd.RDD.take(RDD.scala:1077) в org.apache.spark. rdd.RDD.first(RDD.scala:1110) в $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:61) в

изменить 3 – прогресс!

я смог выяснить, почему возникает ошибка «input INFO not set». зоркие среди вас, несомненно, увидят, что в следующем коде отсутствует закрывающий '('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password") 

когда я делаю это в блокноте spark, я нажимал кнопку «Выполнить» и двигался дальше, потому что не видел ошибки. что я забыл, так это то, что блокнот будет делать то же, что и spark-shell, когда вы опускаете закрывающее ')' - он будет вечно ждать, пока вы его добавите. поэтому ошибка была результатом того, что метод setConnectorInfo никогда не выполнялся.

к сожалению, я все еще не могу засунуть данные таблицы накопления в RDD, который мне пригоден. когда я выполняю

rddX.count

Я вернусь

res15: Длинный = 10000

что является правильным ответом - в таблице, на которую я указал, 10 000 строк данных. однако, когда я пытаюсь захватить первый элемент данных таким образом:

rddX.first

Я получаю следующую ошибку:

org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0.0 на этапе 0.0 (TID 0) имела несериализуемый результат: org.apache.accumulo.core.data.Key

любые мысли о том, куда идти отсюда?

изменить 4 – успешно!

принятый ответ + комментарии составляют 90% пути, за исключением того факта, что ключ/значение аккумуляции необходимо преобразовать во что-то сериализуемое. я получил это, вызвав метод .toString() для обоих. Я постараюсь вскоре опубликовать что-нибудь с полным рабочим кодом, если кто-то еще столкнется с той же проблемой.


person snerd    schedule 24.03.2015    source источник
comment
привет Дэвид, просто интересно, что быстро (потому что я еще не знаю о accumulo ^^). Вы уже пробовали подобное в искровой оболочке? Так что я буду знать, проблема ли это в искровом ноутбуке или нет :-D. Если это аккумулирующая штука, я могу посмотреть на @lossyrob, который использовал Accumulo с Spark в Geotrellis.   -  person Andy Petrella    schedule 25.03.2015
comment
@andypetrella я не пробовал это в spark-shell, потому что, я думаю, spark-notebook просто передает мои команды в spark и возвращает мне то, что он возвращает из spark (вы бы лучше меня знали об этом). я скажу, что когда я пытаюсь следовать инструкциям в документации accumulo, раздел 9.1.2, я получаю java.lang.IllegalStateException: Job в состоянии DEFINE вместо ошибки RUNNING для Job job = new Job(getConf()) или a 'я не знаю, что такое сообщение getConf(), зависит от того, как я все настроил.   -  person snerd    schedule 25.03.2015
comment
я вижу здесь pastebin.com/ti7Qz19m, что этот человек следует методу, указанному в документах accumulo, но я могу не получить от этого никакой тяги.   -  person snerd    schedule 25.03.2015
comment
действительно, основываясь на ответе @Sietse и вашей ссылке pastebin, мы могли бы сказать, что строки № 173 и далее сходятся с использованием этого статического (и странного) метода   -  person Andy Petrella    schedule 25.03.2015
comment
По крайней мере, я мог бы дать вам эти указатели. Так что можете попытаться двигаться вперед, основываясь на том, что вы там увидите. В георешетке интенсивно используют аккумулирование с Spark. Вот где: github.com/geotrellis/geotrellis/blob/master/spark/src/main/. Но сам по себе полный пакет стоит прочитать, чтобы увидеть, как взаимодействовать или определять новые вещи. Но я пока слишком нуб в накоплении, чтобы помогать дальше. Извините, я буду учиться на основе ваших открытий :-D   -  person Andy Petrella    schedule 26.03.2015
comment
Я знаю, что это немного поздно, но спасибо за это руководство. С новым Spark (2.x) вам нужно создать SparkSession, а затем установить его контекст с параметрами, указанными выше. Что касается сериализации — я создал объект SparkConf при создании SparkSession с KryoSerializer, что-то вроде этого: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); и это работало безупречно.   -  person Sekula1991    schedule 18.07.2017


Ответы (2)


Как правило, при использовании настраиваемых форматов ввода Hadoop информация указывается с помощью файла JobConf. Как указал @Sietse, в AccumuloInputFormat есть несколько статических методов, которые вы можете использовать для настройки JobConf. В этом случае я думаю, что вы хотели бы сделать это:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
classOf[org.apache.accumulo.core.data.Key], 
classOf[org.apache.accumulo.core.data.Value]
)

Примечание. После изучения кода кажется, что свойство настроено частично на основе вызываемого класса (имеет смысл потенциально избежать конфликтов с другими пакетами), поэтому, когда мы идем и возвращаем его обратно в конкретный класс позже он не может найти настроенный флаг. Решение этой проблемы состоит в том, чтобы не использовать абстрактные классы. см. https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/ConfiguratorBase.java#L127 для деталей реализации). Если вы не можете вызвать этот метод для конкретной реализации с помощью spark-notebook, возможно, самым простым решением будет использование spark-shell или регулярно собираемого приложения.

person Holden    schedule 25.03.2015
comment
Кроме того, согласно вашему обновлению, которое кажется довольно близким к этому, вам все равно нужно указать имя таблицы (например, я думаю, что в вашем коде отсутствует вызов setInputTableName). - person Holden; 26.03.2015
comment
мне нужно сделать что-нибудь, чтобы настроить объект clientConfig перед передачей его в setZookeeperInstance? - person snerd; 26.03.2015
comment
также - к вашему сведению - вы не можете вызвать setScanAuthorizations для AccumuloInputFormat из spark-notebook, так как это даст вам ошибку «не член» - person snerd; 26.03.2015
comment
@DavidDaedalus Пакеты, которые вы упоминаете в своем обновлении org.apache.accumulo.core.client.mapreduce, кажутся именами пакетов для версии (версий) ‹= 1.4 и org.apache.accumulo.core.client.mapred для версий › = 1,5. В Accumulo 1.4 нет метода setScanAuthorizations, а в версии 1.5 и выше он есть. Попробуйте импортировать из правильного пакета и посмотрите, работает ли он. - person Sietse; 26.03.2015
comment
я уже импортирую org.apache.accumulo.core.client.mapred._ . кроме того, оба перечислены в документации API 1.6: accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/ accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/ - person snerd; 26.03.2015
comment
я думаю, проблема может заключаться в том, что setScanAuthorizations на самом деле унаследован AccumuloInputFormat от AbstractInputFormat - person snerd; 26.03.2015
comment
@DavidDaedalus Я обновил пример, включив в него создание clientConfig . Что касается setScanAuthorizations , глядя на javadoc, кажется, что он должен быть определен в AccumuloInputFormat (он унаследован от AbstractInputFormat). - person Holden; 26.03.2015
comment
@ Holden w00t - я попробую. Если вы имеете в виду «определено» как --> AccumuloInputFormat.setScanAuthorizations, то да, это «должно» быть. Однако помните, что я делаю это через spark-notebook, и когда я пытаюсь использовать вышеуказанный вызов метода, я получаю сообщение об ошибке «я не знаю, о чем вы говорите с этим бизнесом setScanAuthorizations». То же самое относится и к Beanshell (Java REPL / пакет сценариев) - он не знает об унаследованных методах. единственный способ добраться до них — это явно вызвать их из класса, в котором они определены. - person snerd; 26.03.2015
comment
blerg - не радость с обновленным кодом. т / г, кстати, за то, что придерживаетесь меня в этом! - person snerd; 26.03.2015
comment
С каким последним исключением вы столкнулись? - person Holden; 26.03.2015
comment
то же самое: java.io.IOException: входная информация не была установлена, когда я пытаюсь выполнить операцию: rddX.first - person snerd; 26.03.2015
comment
Если копаться в этом коде, похоже, что вы застрянете, поскольку он устанавливает свойство конфигурации на основе класса реализации (см. github.com/apache/accumulo/blob/master/core/src/main/java/org/ ), поэтому вам действительно нужно использовать конкретный класс, а не вызывать методы абстрактных классов. Пожалуйста, попробуйте с искровой оболочкой и дайте мне знать, если это сработает. - person Holden; 26.03.2015
comment
bleg — проблемы из оболочки с токеном авторизации. это начинает выглядеть как проблема многих сортов пива. Лемми, иди позаботься об этом, а я вернусь с результатами запуска этого либо непосредственно из spark-shell, либо из автономной программы scala, отправленной через spark-submit. - person snerd; 26.03.2015
comment
Опубликовано обновление 3 - оказывается, ваш метод был правильным, если я напрямую ссылаюсь на рассматриваемый метод И не забудьте закрыть мои скобки: -p, к сожалению, теперь он взрывается по другой причине - person snerd; 26.03.2015

Похоже, что эти параметры должны быть установлены с помощью статических методов: http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.html. Поэтому попробуйте установить необязательные параметры и запустить снова. Он должен работать.

person Sietse    schedule 25.03.2015
comment
радости по-прежнему нет - см. обновление вопроса для получения дополнительной информации - person snerd; 26.03.2015