Spark, DataFrame: применить преобразователь/оценщик к группам

У меня есть DataFrame, который выглядит следующим образом:

+-----------+-----+------------+
|     userID|group|    features|
+-----------+-----+------------+
|12462563356|    1|  [5.0,43.0]|
|12462563701|    2|   [1.0,8.0]|
|12462563701|    1|  [2.0,12.0]|
|12462564356|    1|   [1.0,1.0]|
|12462565487|    3|   [2.0,3.0]|
|12462565698|    2|   [1.0,1.0]|
|12462565698|    1|   [1.0,1.0]|
|12462566081|    2|   [1.0,2.0]|
|12462566081|    1|  [1.0,15.0]|
|12462566225|    2|   [1.0,1.0]|
|12462566225|    1|  [9.0,85.0]|
|12462566526|    2|   [1.0,1.0]|
|12462566526|    1|  [3.0,79.0]|
|12462567006|    2| [11.0,15.0]|
|12462567006|    1| [10.0,15.0]|
|12462567006|    3| [10.0,15.0]|
|12462586595|    2|  [2.0,42.0]|
|12462586595|    3|  [2.0,16.0]|
|12462589343|    3|   [1.0,1.0]|
+-----------+-----+------------+

Где типы столбцов: userID: Long, group: Int и features:vector.

Это уже сгруппированный DataFrame, т. е. идентификатор пользователя появится в определенной группе не более одного раза.

Моя цель - масштабировать столбец features для каждой группы.

Есть ли способ применить трансформатор функций (в моем случае я бы например применить StandardScaler) на группу вместо того, чтобы применять его к полному фрейму данных.

P.S. использование ML не является обязательным, поэтому нет проблем, если решение основано на MLlib.


person Rami    schedule 15.02.2016    source источник


Ответы (1)


Вычислить статистику

Искра >= 3,0

Теперь Summarizer поддерживает стандартные отклонения, поэтому

val summary = data
  .groupBy($"group")
  .agg(Summarizer.metrics("mean", "std")
  .summary($"features").alias("stats"))
  .as[(Int, (Vector, Vector))]
  .collect.toMap

Искра >= 2,3

В Spark 2.3 или более поздней версии вы также можете использовать Summarizer:

import org.apache.spark.ml.stat.Summarizer

val summaryVar = data
  .groupBy($"group")
  .agg(Summarizer.metrics("mean", "variance")
  .summary($"features").alias("stats"))
  .as[(Int, (Vector, Vector))]
  .collect.toMap

и настроить нижестоящий код для обработки отклонений вместо стандартных отклонений.

Spark ‹ 2.0, Spark ‹ 2.3 с корректировками конверсий между ml и mllib Vectors.

Вы можете вычислить статистику по группам, используя почти тот же код, что и по умолчанию Scaler:

import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.Row

// Compute Multivariate Statistics 
val summary = data.select($"group", $"features")
    .rdd
    .map {
         case Row(group: Int, features: Vector) => (group, features) 
    }
    .aggregateByKey(new MultivariateOnlineSummarizer)(/* Create an empty new MultivariateOnlineSummarizer */
         (agg, v) => agg.add(v), /* seqOp : Add a new sample Vector to this summarizer, and update the statistical summary. */
         (agg1, agg2) => agg1.merge(agg2)) /* combOp : As MultivariateOnlineSummarizer accepts a merge action with another MultivariateOnlineSummarizer, and update the statistical summary. */
    .mapValues {
      s => (
         s.variance.toArray.map(math.sqrt(_)), /* compute the square root variance for each key */
         s.mean.toArray /* fetch the mean for each key */
      )
    }.collectAsMap

Трансформация

Если ожидаемое количество групп относительно невелико, вы можете транслировать их:

val summaryBd = sc.broadcast(summary)

и преобразовать ваши данные:

val scaledRows = df.rdd.map{ case Row(userID, group: Int, features: Vector) =>
  val (stdev, mean)  =  summaryBd.value(group)
  val vs = features.toArray.clone()
  for (i <- 0 until vs.size) {
    vs(i) = if(stdev(i) == 0.0) 0.0 else (vs(i) - mean(i)) * (1 / stdev(i))
  }
  Row(userID, group, Vectors.dense(vs))
}
val scaledDf = sqlContext.createDataFrame(scaledRows, df.schema)

В противном случае вы можете просто присоединиться. Нетрудно обернуть это как преобразователь ML с групповым столбцом в качестве параметра.

person zero323    schedule 15.02.2016