Как создать накопитель пользовательского набора, например Set[String]?

Я пытаюсь использовать пользовательский аккумулятор в Apache Spark для накопления в наборе. Результат должен иметь тип Set[String]. Для этого я создал собственный аккумулятор:

object SetAccumulatorParam extends AccumulatorParam[Set[String]] {
    def addInPlace(r1: mutable.Set[String], r2: mutable.Set[String]): mutable.Set[String] = {
        r1 ++= r2
    }

    def zero(initialValue: mutable.Set[String]): mutable.Set[String] = {
        Set()
    }
}

Тем не менее, я не могу создать экземпляр переменной этого типа.

val tags = sc.accumulator(Set(""))(SetAccumulatorParam)

в результате ошибка. Пожалуйста, помогите.

required: org.apache.spark.AccumulatorParam[Set[String]]

person ozil    schedule 17.02.2017    source источник
comment
То, что вы сделали, сильно отличается от официальной документации (spark.apache.org /docs/последние/). Я также скептически отношусь к использованию объекта здесь, поскольку я предполагаю, что Spark захочет создать экземпляр этого аккумулятора в какой-то момент.   -  person LiMuBei    schedule 17.02.2017


Ответы (2)


В дополнение к ответу Траяна, вот общий случай SetAccumulator для spark 2.x.

import org.apache.spark.util.AccumulatorV2

class SetAccumulator[T](var value: Set[T]) extends AccumulatorV2[T, Set[T]] {
  def this() = this(Set.empty[T])
  override def isZero: Boolean = value.isEmpty
  override def copy(): AccumulatorV2[T, Set[T]] = new SetAccumulator[T](value)
  override def reset(): Unit = Set.empty[T]
  override def add(v: T): Unit = value + v
  override def merge(other: AccumulatorV2[T, Set[T]]): Unit = value ++ other.value
  override def value: Set[String] = value
}

И вы можете использовать его следующим образом:

val accum = new SetAccumulator[String]()
spark.sparkContext.register(accum, "My Accum") // Optional, name it for SparkUI

spark.sparkContext.parallelize(Seq("a", "b", "a", "b", "c")).foreach(s => accum.add(s))

accum.value

Что выводит:

Set[String] = Set(a, b, c)
person Ryan Widmaier    schedule 05.03.2018

Обновление для 1.6:

object StringSetAccumulatorParam extends AccumulatorParam[Set[String]] {
    def zero(initialValue: Set[String]): Set[String] = { Set() }
    def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = { s1 ++ s2 }
}

val stringSetAccum = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
sc.parallelize(Array("1", "2", "3", "1")).foreach(s => stringSetAccum += Set(s))
stringSetAccum.value.toString
res0: String = Set(2, 3, 1)

В Spark 2.0 вы, вероятно, можете использовать существующий collectionAccumulator (если вам нужны отдельные значения, вы можете проверить и добавить, только если они не существуют):

val collAcc = spark.sparkContext.collectionAccumulator[String]("myCollAcc")
collAcc: org.apache.spark.util.CollectionAccumulator[String] = CollectionAccumulator(id: 32154, name: Some(myCollAcc), value: [])

spark.sparkContext.parallelize(Array("1", "2", "3")).foreach(s => collAcc.add(s))

collAcc.value.toString
res0: String = [3, 2, 1]

Дополнительная информация: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2

person Traian    schedule 17.02.2017