Вот снова ваша таблица:
val df = Seq(
(0, 5),
(0, 15),
(1, 7),
(1, 3),
(2, 3),
(2, 4),
(2, 5),
(2, 9),
(3, 25)
).toDF("userId", "entId")
df.show()
Выходы:
+------+-----+
|userId|entId|
+------+-----+
| 0| 5|
| 0| 15|
| 1| 7|
| 1| 3|
| 2| 3|
| 2| 4|
| 2| 5|
| 2| 9|
| 3| 25|
+------+-----+
Теперь вы можете сгруппировать по userId
, а затем собрать endId
в списки, присвоив получившемуся столбцу списков псевдоним entIds
:
import org.apache.spark.sql.functions._
val entIdsForUserId = df.
groupBy($"userId").
agg(collect_list($"entId").alias("entIds"))
entIdsForUserId.show()
Выход:
+------+------------+
|userId| entIds|
+------+------------+
| 1| [7, 3]|
| 3| [25]|
| 2|[3, 4, 5, 9]|
| 0| [5, 15]|
+------+------------+
Порядок после groupBy
не указан. В зависимости от того, что вы хотите с ним делать, вы можете дополнительно отсортировать его.
Вы можете собрать его в единую карту на мастер-ноде:
val m = entIdsForUserId.
map(r => (r.getAs[Int](0), r.getAs[Seq[Int]](1))).
collect.toMap
это даст вам:
Map(1 -> List(7, 3), 3 -> List(25), 2 -> List(3, 4, 5, 9), 0 -> List(5, 15))
person
Andrey Tyukin
schedule
25.02.2018