Чтение отдельных каталогов и создание отдельных RDD параллельно с помощью Scala Spark

Мне нужно читать файлы JSON из отдельных исходных каталогов и создавать отдельные таблицы для каждого каталога. Я бы хотел, чтобы это делалось параллельно, но Spark не поддерживает вложенные RDD, поэтому в настоящее время он делает это последовательно. Есть ли хорошее решение для параллельного чтения/обработки этих каталогов?

Вот пример того, что я пытаюсь сделать, но это не работает из-за вложенных RDD:

def readJsonCreateTable(tableInfo: (String, String)) {
  val df = spark
           .read
           .json(tableInfo._1)
  df.createOrReplaceTempView(tableInfo._2)
}

val dirList = List(("/mnt/jsondir1", "temptable1"),
                   ("/mnt/jsondir2", "temptable2"),
                   ("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error

Изменение последней строки на dirRDD.collect.foreach работает, но тогда работа не распределяется и выполняется последовательно, поэтому очень медленно.

Также попробовал dirRDD.collect.par.foreach, но он запускает только параллельные потоки в драйвере и не использует все остальные узлы.

Я просмотрел foreachAsync, но не уверен, что асинхронность обязательно параллельна в этой ситуации из-за вложенности.

Это использует Spark 2.0 и Scala 2.11 через Databricks.

===========
Дополнение:

Я попробовал foreachAsync, который возвращает FutureAction в Spark, но это также дало ошибку.

import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)

И, по-видимому, SimpleFutureAction не сериализуем.

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction

person TBhimdi    schedule 12.01.2017    source источник


Ответы (1)


Вы можете использовать Scala параллельные коллекции или futures, чтобы распараллелить код, работающий на драйвере Spark. Драйвер Spark является потокобезопасным, поэтому он будет работать должным образом.

Вот пример использования параллельных коллекций с явно указанными пулами потоков:

val dirList = List(
  ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
  ("dbfs:/databricks-datasets/amazon/users/", "users")
).par

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)

try {
  dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
  dirList.foreach { case (filename, tableName) =>
    println(s"Starting to create table for $tableName")
    val df = spark.read.json(filename)
    println(s"Done creating table for $tableName")
    df.createOrReplaceTempView(tableName)
  }
} finally {
  pool.shutdown() // to prevent thread leaks.
  // You could also re-use thread pools across collections.
}

Когда я запускал это в Databricks, вывод журнала потоковой передачи указывал на то, что две таблицы загружались параллельно:

Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users

Этот параллелизм также отразился на представлении временной шкалы заданий пользовательского интерфейса Spark.

Конечно, вы также можете использовать для этого потоки Java. Короче говоря, безопасно вызывать API-интерфейсы драйвера Spark из нескольких потоков, поэтому выберите предпочитаемую среду параллелизма JVM и выполняйте параллельные вызовы драйвера Spark для создания таблиц.

person Josh Rosen    schedule 12.01.2017
comment
Согласно stackoverflow.com/questions /41426576/ не рекомендуется: › Любое параллельное выполнение внутри задачи полностью непрозрачно для менеджера ресурсов и, как следствие, не может автоматически выделять требуемые ресурсы - person TBhimdi; 12.01.2017
comment
Этот связанный вопрос SO обсуждает параллелизм внутри задачи, который отличается от параллельного выполнения нескольких заданий Spark с параллельным кодом в драйвере. - person Josh Rosen; 12.01.2017