Delta Lake: исключение File Not Found

Я использую Delta Lake для выполнения операции слияния, для которой я пытаюсь преобразовать свои файлы Parquet в формат дельты, которые со временем разбиваются на разделы:

val source = spark.read.parquet("s3a://data-lake/source/")
       source
      .write
      .option("maxRecordsPerFile",20000)
      .mode("overwrite")
      .partitionBy("time")
      //.option("fs.s3a.committer.name", "partitioned") (I even tried using s3a committers)
      .format("delta")
      .save("s3a://data-lake/target/")

Данные составляют около 250 ГБ, и мои конфигурации искры:

spark.cores.max 420
spark.default.parallelism   10000
spark.delta.logStore.class  org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
spark.driver.extraJavaOptions   -Xms20g
spark.driver.memory 28g
spark.executor.cores    2
spark.executor.memory 28G

В журналах он показывает ошибку File Not Found и в конечном итоге убивает исполнителей после некоторого времени работы:

20/05/16 11:51:02 WARN TaskSetManager: Lost task 208.0 in stage 2.0 (TID 3294, 172.16.145.25, executor 14): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: No such file or directory: s3a://data-lake/target/time=20190101/part-00208-2c9b5ddd-f2c1-4b8c-9d77-eacb0055ff82.c000.snappy.parquet

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 53 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Я пробовал сохранить его как формат паркета, он работает как положено. Но на обработку уходит значительно больше времени.


person Vishal    schedule 16.05.2020    source источник
comment
Попробуйте не устанавливать spark.delta.logStor и максимальное количество записей для каждого файла   -  person Salim    schedule 16.05.2020
comment
У меня такая же проблема. Не думаю, что дело в настройках. Причина может быть та же, что и здесь - github.com/delta-io/delta/issues / 341.   -  person Cheng    schedule 29.07.2020


Ответы (1)


Если он уже находится в паркете, вы можете скопировать папку в s3 в целевое место без искры и инициализировать ее как дельта-таблицу со следующей строкой:

import io.delta.DeltaTable
DeltaTable.forPath("s3a://data-lake/target/"))

Дополнительная информация .

person Alfilercio    schedule 17.05.2020