Сбой Spark при чтении файла json при связывании с aws-java-sdk

Пусть config.json будет небольшим файлом json:

{
    "toto": 1
}

Я сделал простой код, который читает файл json с помощью sc.textFile (потому что файл может быть на S3, локальном или HDFS, поэтому textFile удобен)

import org.apache.spark.{SparkContext, SparkConf}

object testAwsSdk {
  def main( args:Array[String] ):Unit = {
    val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val json = sc.textFile("config.json") 
    println(json.collect().mkString("\n"))
  }
}

Файл SBT тянет только spark-core библиотеку

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)

программа работает как положено, записывая содержимое config.json на стандартный вывод.

Теперь я хочу связать также с aws-java-sdk, amazon sdk для доступа к S3.

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)

Выполняя тот же код, spark выдает следующее исключение.

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
 at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
    at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
    at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
    at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)
    at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:82)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:133)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:133)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:133)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1012)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:827)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:825)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
    at org.apache.spark.SparkContext.textFile(SparkContext.scala:825)
    at testAwsSdk$.main(testAwsSdk.scala:11)
    at testAwsSdk.main(testAwsSdk.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Читая стек, кажется, что когда aws-java-sdk связан, sc.textFile обнаруживает, что файл является файлом json, и пытается проанализировать его с помощью jackson, предполагающего определенный формат, который он, конечно, не может найти. Мне нужно связать с aws-java-sdk, поэтому мои вопросы:

1- Почему добавление aws-java-sdk изменяет поведение spark-core?

2- Есть ли обходной путь (файл может быть на HDFS, S3 или локально)?


person Boris    schedule 01.11.2015    source источник
comment
это связано с тем, что aws-java-sdk использует последнюю версию 2.5.3 библиотеки jackson, а spark использует более старую версию 2.4.4. Я столкнулся с той же проблемой, но не смог ее решить. если вы нашли решение, пожалуйста, поделитесь им. спасибо   -  person Hafiz Mujadid    schedule 12.11.2015
comment
Привет, Хафиз... Довольно надоедливо, не так ли? Я отправляю дело в AWS. Они подтвердили, что это проблема совместимости. Однако они не сказали мне четкого решения. Постараюсь разобраться в ближайшее время.   -  person Boris    schedule 18.11.2015
comment
Привет Борис! да, это раздражает, чтобы столкнуться с этой проблемой, но я решил ее, исключив библиотеки jackson core и jackson module из spark-core и добавив зависимость последней библиотеки jackson core.   -  person Hafiz Mujadid    schedule 18.11.2015
comment
@HafizMujadid, как ты это сделал? Могли бы вы объяснить? Спасибо.   -  person Barbaros Alp    schedule 12.07.2016


Ответы (2)


Общался со службой поддержки Амазон. Это проблема зависимости с библиотекой Джексона. В SBT переопределить Джексона:

libraryDependencies ++= Seq( 
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
) 

dependencyOverrides ++= Set( 
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4" 
) 

их ответ: Мы сделали это на Mac, экземпляре Ec2 (redhat AMI) и на EMR (Amazon Linux). 3 Различные среды. Основная причина проблемы заключается в том, что sbt строит граф зависимостей, а затем решает проблему конфликтов версий, удаляя старую версию и выбирая последнюю версию зависимой библиотеки. В этом случае искра зависит от версии библиотеки jackson 2.4, а для AWS SDK требуется версия 2.5. Таким образом, возникает конфликт версий, и sbt вытесняет версию зависимости spark (которая старше) и выбирает версию AWS SDK (которая является самой последней).

person Boris    schedule 03.12.2015

Добавление к ответу Бориса, если вы не хотите использовать исправленную версию Джексона (возможно, в будущем вы обновить Spark), но все еще хотите отказаться от AWS, вы можете сделать следующее:

libraryDependencies ++= Seq( 
  "com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile" excludeAll (
    ExclusionRule("com.fasterxml.jackson.core", "jackson-databind")
  ),
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
) 
person nedim    schedule 22.06.2016