Попытка подсчитать уникальных пользователей между двумя категориями в Spark

У меня есть структура набора данных в Spark с двумя столбцами, один называется user, а другой - category. Так что таблица выглядит примерно так:

+---------------+---------------+
|           user|       category|
+---------------+---------------+
|        garrett|        syncopy|
|       garrison|    musictheory|
|          marta|     sheetmusic|
|        garrett|  orchestration|
|         harold|         chopin|
|          marta|   russianmusic|
|           niko|          piano|
|          james|     sheetmusic|
|          manny|         violin|
|        charles|       gershwin|
|         dawson|          cello|
|            bob|          cello|
|         george|          cello|
|         george|  americanmusic|
|            bob| personalcompos|
|         george|     sheetmusic|
|           fred|     sheetmusic|
|            bob|     sheetmusic|
|       garrison|     sheetmusic|
|         george|    musictheory|
+---------------+---------------+
only showing top 20 rows

Каждая строка в таблице уникальна, но пользователь и категория могут появляться несколько раз. Цель состоит в том, чтобы подсчитать количество пользователей двух категорий. Например, cello и americanmusic имеют общего пользователя с именами george и musictheory и sheetmusic имеют общих пользователей george и garrison. Цель состоит в том, чтобы получить количество различных пользователей между n категориями, что означает, что между категориями имеется не более n квадратов ребер. Я частично понимаю, как выполнить эту операцию, но я немного пытаюсь преобразовать свои мысли в Spark Java.

Я считаю, что мне нужно выполнить самосоединение на user, чтобы получить таблицу, которая была бы структурирована следующим образом:

+---------------+---------------+---------------+
|           user|       category|       category|
+---------------+---------------+---------------+
|       garrison|    musictheory|     sheetmusic|
|         george|    musictheory|     sheetmusic|
|       garrison|    musictheory|    musictheory|
|         george|    musictheory|    musicthoery|
|       garrison|     sheetmusic|    musictheory|
|         george|     sheetmusic|    musictheory|
+---------------+---------------+---------------+

Операция самосоединения в Spark (код Java) несложна:

Dataset<Row> newDataset = allUsersToCategories.join(allUsersToCategories, "users");

Это что-то получается, однако я получаю сопоставления с той же категорией, что и в строках 3 и 4 в приведенном выше примере, и я получаю обратные сопоставления, где категории меняются местами, так что по сути это двойной учет каждого взаимодействия с пользователем, как в строках 5 и 6 таблицы выше пример.

Что я думаю, что мне нужно сделать, так это иметь какое-то условное выражение в моем соединении, которое говорит что-то вроде X < Y, чтобы одинаковые категории и дубликаты были выброшены. Наконец, мне нужно подсчитать количество отдельных строк для n возведенных в квадрат комбинаций, где n - количество категорий.

Может ли кто-нибудь объяснить, как это сделать в Spark и, в частности, в Spark Java, поскольку я немного не знаком с синтаксисом Scala?

Спасибо за помощь.


person Troll_Hunter    schedule 15.06.2017    source источник
comment
Напишите запрос SQL и выполните его. И любой фильтр можно использовать   -  person UninformedUser    schedule 15.06.2017
comment
что бы SQL-запрос был явно   -  person Troll_Hunter    schedule 15.06.2017


Ответы (2)


Не уверен, что правильно понимаю ваши требования, но постараюсь помочь.

По моему мнению, ожидаемый результат для приведенных выше данных должен выглядеть, как показано ниже. Если это неправда, дайте мне знать, я постараюсь внести необходимые изменения.

+--------------+--------------+-+
|_1            |_2            |
+--------------+--------------+-+
|personalcompos|sheetmusic    |1|
|cello         |musictheory   |1|
|americanmusic |cello         |1|
|cello         |sheetmusic    |2|
|cello         |personalcompos|1|
|russianmusic  |sheetmusic    |1|
|americanmusic |sheetmusic    |1|
|americanmusic |musictheory   |1|
|musictheory   |sheetmusic    |2|
|orchestration |syncopy       |1|
+--------------+--------------+-+

В этом случае вы можете решить свою проблему с помощью приведенного ниже кода Scala:

allUsersToCategories
    .groupByKey(_.user)
    .flatMapGroups{case (user, userCategories) =>
      val categories = userCategories.map(uc => uc.category).toSeq
      for {
         c1 <- categories
         c2 <- categories
         if c1 < c2
      } yield (c1, c2)
    }
    .groupByKey(x => x)
    .count()
    .show()

Если вам нужен симметричный результат, вы можете просто изменить оператор if в преобразовании flatMapGroups на if c1 != c2.

Обратите внимание, что в приведенном выше примере я использовал API набора данных, который для тестовых целей был создан с помощью следующего кода:

case class UserCategory(user: String, category: String)

val allUsersToCategories = session.createDataset(Seq(
   UserCategory("garrett", "syncopy"),
   UserCategory("garrison", "musictheory"),
   UserCategory("marta", "sheetmusic"),
   UserCategory("garrett", "orchestration"),
   UserCategory("harold", "chopin"),
   UserCategory("marta", "russianmusic"),
   UserCategory("niko", "piano"),
   UserCategory("james", "sheetmusic"),
   UserCategory("manny", "violin"),
   UserCategory("charles", "gershwin"),
   UserCategory("dawson", "cello"),
   UserCategory("bob", "cello"),
   UserCategory("george", "cello"),
   UserCategory("george", "americanmusic"),
   UserCategory("bob", "personalcompos"),
   UserCategory("george", "sheetmusic"),
   UserCategory("fred", "sheetmusic"),
   UserCategory("bob", "sheetmusic"),
   UserCategory("garrison", "sheetmusic"),
   UserCategory("george", "musictheory")
))

Я пытался привести пример на Java, но у меня нет опыта работы с Java + Spark, и мне требуется слишком много времени, чтобы перенести приведенный выше пример со Scala на Java ...

person Piotr Kalański    schedule 15.06.2017

Я нашел ответ пару часов назад, используя spark sql:

    Dataset<Row> connection per shared user = spark.sql("SELECT a.user as user, "
                                                            + "a.category as categoryOne, "
                                                            + "b.category as categoryTwo "
                                                            + "FROM allTable as a INNER JOIN allTable as b "
                                                            + "ON a.user = b.user AND a.user < b.user");

Затем будет создан набор данных с тремя столбцами user, categoryOne и categoryTwo. Каждая строка будет уникальной и укажет, когда пользователь существует в обеих категориях.

person Troll_Hunter    schedule 15.06.2017