Spark — Генерация случайных чисел

Я написал метод, который должен учитывать случайное число для имитации распределения Бернулли. Я использую random.nextDouble для генерации числа от 0 до 1, а затем принимаю решение на основе этого значения с учетом моего параметра вероятности.

Моя проблема в том, что Spark генерирует одни и те же случайные числа в каждой итерации моей функции отображения цикла for. Я использую DataFrame API. Мой код следует этому формату:

val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}

Вот класс:

class myClass extends Serializable {
  val q = qProb

  def myMethod(s: String, rand: Double) = {
    if (rand <= q) // do something
    else // do something else
  }
}

Мне нужно новое случайное число каждый раз, когда вызывается myMethod. Я также попытался сгенерировать число внутри моего метода с помощью java.util.Random (scala.util.Random v10 не расширяет Serializable), как показано ниже, но я все еще получаю одни и те же числа в каждом цикле for.

val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()

Я провел некоторое исследование, и кажется, что это связано с детерминированной природой Sparks.


person Brian    schedule 06.04.2016    source источник


Ответы (4)


Причина, по которой повторяется одна и та же последовательность, заключается в том, что генератор случайных чисел создается и инициализируется начальным числом до разделения данных. Затем каждый раздел начинается с одного и того же случайного начального числа. Возможно, это не самый эффективный способ сделать это, но следующее должно работать:

val myClass = new MyClass()
val M = 3

for (m <- 1 to M) {
  val newDF = sqlContext.createDataFrame(myDF
    .map{ 
       val rand = scala.util.Random
       row => RowFactory
      .create(row.getString(0),
        myClass.myMethod(row.getString(2), rand.nextDouble())
    }, myDF.schema)
}
person Pascal Soucy    schedule 06.04.2016
comment
Я немного изменил это, чтобы решить мою проблему. Я передал Random val в свой метод и сгенерировал оттуда случайные числа. Это решило мою проблему, но мне пришлось использовать java.util.Random из соображений сериализуемости. - person Brian; 06.04.2016

Просто используйте функцию SQL rand:

import org.apache.spark.sql.functions._

//df: org.apache.spark.sql.DataFrame = [key: int]

df.select($"key", rand() as "rand").show
+---+-------------------+
|key|               rand|
+---+-------------------+
|  1| 0.8635073400704648|
|  2| 0.6870153659986652|
|  3|0.18998048357873532|
+---+-------------------+


df.select($"key", rand() as "rand").show
+---+------------------+
|key|              rand|
+---+------------------+
|  1|0.3422484248879837|
|  2|0.2301384925817671|
|  3|0.6959421970071372|
+---+------------------+
person David Griffin    schedule 06.04.2016
comment
Это не совсем решило мою проблему, но это элегантное решение, которое я, вероятно, буду использовать в будущем, поэтому +1 - person Brian; 06.04.2016

Согласно этот пост , лучшее решение — ставить new scala.util.Random не внутри карты и не совсем снаружи (т.е. в коде драйвера), а в промежуточном mapPartitionsWithIndex:

import scala.util.Random
val myAppSeed = 91234
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) =>
   val rand = new scala.util.Random(indx+myAppSeed)
   iter.map(x => (x, Array.fill(10)(rand.nextDouble)))
}
person leo9r    schedule 06.10.2016

Использование Spark Dataset API, возможно, для использования в аккумуляторе:

df.withColumn("_n", substring(rand(),3,4).cast("bigint"))
person Joshua David Lickteig    schedule 06.06.2019