Проблема сериализации Kryo с коллекцией в поле ProtoBuf

Я получаю объект protobuf от Kafka в своем приложении Spark (v1.6.1), которое использует сериализатор Kryo. Объект protobuf выглядит примерно так:

  private A() {
          abc_ = "";
          xyz_ = "";
          ... some more fields
          aList_ = java.util.Collections.emptyList();
          ... some more fields
    }

Когда я запускаю приложение spark, оно выдает исключение для коллекции «aList_», и я получаю следующую ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage 18.0 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    aList_ (...packageName/...protoBufObject$A)
     at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
     at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
     at org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:171)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:201)
     at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:198)
     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
     at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
     at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152)
     at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:58)
     at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:83)
     at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
     at org.apache.spark.scheduler.Task.run(Task.scala:89)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
     at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.UnsupportedOperationException
         at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:102)
         at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
         ... 27 more

Я вижу похожую проблему в приведенной ниже ссылке, но еще не нашел решения.

Spark, проблема сериализации Kryo с полем ProtoBuf

Кто-нибудь еще сталкивался с этой проблемой?


person Pooja Mazumdar    schedule 17.10.2017    source источник
comment
Попробуйте установить это значение aList как trazient lazy val   -  person Thiago Baldim    schedule 18.10.2017
comment
Я использую искру в java. Transient lazy val, я полагаю, специфичен для scala?   -  person Pooja Mazumdar    schedule 18.10.2017


Ответы (1)


Если кто-то столкнется с этой проблемой, я заработал, используя метод, описанный в моем другом посте - -kryo-in-spark-code/46838306">Как установить немодифицируемый сериализатор коллекции Kryo в коде Spark

person Pooja Mazumdar    schedule 19.10.2017