Набор данных Spark: вернуть HashMap значений с одинаковым ключом

+------+-----+
|userID|entID|
+------+-----+
|     0|    5|
|     0|   15|
|     1|    7|
|     1|    3|
|     2|    3|
|     2|    4|
|     2|    5|
|     2|    9|
|     3|   25|
+------+-----+

Мне нужен результат как {0->(5,15), 1->(7,3),..}

Любая помощь будет оценена по достоинству.


person The_Coder    schedule 25.02.2018    source источник


Ответы (2)


Вот снова ваша таблица:

  val df = Seq(
    (0,  5),
    (0, 15),
    (1,  7),
    (1,  3),
    (2,  3),
    (2,  4),
    (2,  5),
    (2,  9),
    (3, 25)
  ).toDF("userId", "entId")
  df.show()

Выходы:

+------+-----+
|userId|entId|
+------+-----+
|     0|    5|
|     0|   15|
|     1|    7|
|     1|    3|
|     2|    3|
|     2|    4|
|     2|    5|
|     2|    9|
|     3|   25|
+------+-----+

Теперь вы можете сгруппировать по userId, а затем собрать endId в списки, присвоив получившемуся столбцу списков псевдоним entIds:

  import org.apache.spark.sql.functions._
  val entIdsForUserId = df.
    groupBy($"userId").                        
    agg(collect_list($"entId").alias("entIds"))

  entIdsForUserId.show()

Выход:

+------+------------+
|userId|      entIds|
+------+------------+
|     1|      [7, 3]|
|     3|        [25]|
|     2|[3, 4, 5, 9]|
|     0|     [5, 15]|
+------+------------+

Порядок после groupBy не указан. В зависимости от того, что вы хотите с ним делать, вы можете дополнительно отсортировать его.

Вы можете собрать его в единую карту на мастер-ноде:

  val m = entIdsForUserId.
    map(r => (r.getAs[Int](0), r.getAs[Seq[Int]](1))).
    collect.toMap

это даст вам:

Map(1 -> List(7, 3), 3 -> List(25), 2 -> List(3, 4, 5, 9), 0 -> List(5, 15))
person Andrey Tyukin    schedule 25.02.2018

Одним из подходов может быть преобразование набора данных в RDD и выполнение groupByKey. Чтобы получить результат в виде Map, вам потребуется collect сгруппировать предоставленный RDD, если набор данных не слишком велик:

val ds = Seq(
  (0, 5), (0, 15), (1, 7), (1, 3),
  (2, 3), (2, 4), (2, 5), (2, 9), (3, 25)
).toDF("userID", "entID").as[(Int, Int)]
// ds: org.apache.spark.sql.Dataset[(Int, Int)] =[userID: int, entID: int]

val map = ds.rdd.groupByKey.collectAsMap
// map: scala.collection.Map[Int,Iterable[Int]] = Map(
//   2 -> CompactBuffer(3, 4, 5, 9), 1 -> CompactBuffer(7, 3),
//   3 -> CompactBuffer(25), 0 -> CompactBuffer(5, 15)
// )
person Leo C    schedule 25.02.2018
comment
Учитывая, что полученная хэш-карта содержит все записи исходного DataFrame, я предположил, что OP хотел, чтобы она была еще одним DataFrame, а не как хэш-карта на главном узле, поэтому, возможно, версия с промежуточным agg-шагом в конце концов, оказалось полезным ... Но я честно пытался проголосовать за ваше решение как более короткое:] - person Andrey Tyukin; 26.02.2018
comment
Спасибо @Андрей Тюкин. Я думаю, что ваш промежуточный шаг как вариант сохранения данных в масштабе имеет смысл. - person Leo C; 26.02.2018