Мне нужно читать файлы JSON из отдельных исходных каталогов и создавать отдельные таблицы для каждого каталога. Я бы хотел, чтобы это делалось параллельно, но Spark не поддерживает вложенные RDD, поэтому в настоящее время он делает это последовательно. Есть ли хорошее решение для параллельного чтения/обработки этих каталогов?
Вот пример того, что я пытаюсь сделать, но это не работает из-за вложенных RDD:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
Изменение последней строки на dirRDD.collect.foreach работает, но тогда работа не распределяется и выполняется последовательно, поэтому очень медленно.
Также попробовал dirRDD.collect.par.foreach, но он запускает только параллельные потоки в драйвере и не использует все остальные узлы.
Я просмотрел foreachAsync, но не уверен, что асинхронность обязательно параллельна в этой ситуации из-за вложенности.
Это использует Spark 2.0 и Scala 2.11 через Databricks.
===========
Дополнение:
Я попробовал foreachAsync, который возвращает FutureAction в Spark, но это также дало ошибку.
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
И, по-видимому, SimpleFutureAction не сериализуем.
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction