Как использовать mllib.recommendation, если идентификаторы пользователей представляют собой строку, а не непрерывные целые числа?

Я хочу использовать библиотеку Spark mllib.recommendation для создания прототипа рекомендательной системы. Однако формат пользовательских данных, который у меня есть, имеет следующий формат:

AB123XY45678
CD234WZ12345
EF345OOO1234
GH456XY98765
....

Если я хочу использовать библиотеку mllib.recommendation, в соответствии с API класса Rating идентификаторы пользователей должны быть целыми числами (также должны быть непрерывными?)

Похоже, что необходимо выполнить какое-то преобразование между реальными идентификаторами пользователей и числовыми, используемыми Spark. Но как мне это сделать?


person shihpeng    schedule 05.01.2015    source источник


Ответы (4)


Spark на самом деле не требует числового идентификатора, ему просто нужно иметь какое-то уникальное значение, но для реализации они выбрали Int.

Вы можете сделать простое преобразование туда и обратно для userId:

  case class MyRating(userId: String, product: Int, rating: Double)

  val data: RDD[MyRating] = ???

  // Assign unique Long id for each userId
  val userIdToInt: RDD[(String, Long)] = 
    data.map(_.userId).distinct().zipWithUniqueId()

  // Reverse mapping from generated id to original
  val reverseMapping: RDD[(Long, String)]
    userIdToInt map { case (l, r) => (r, l) }

  // Depends on data size, maybe too big to keep
  // on single machine
  val map: Map[String, Int] = 
    userIdToInt.collect().toMap.mapValues(_.toInt)

  // Transform to MLLib rating
  val rating: RDD[Rating] = data.map { r =>
    Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating)
    // -- or
    Rating(map(r.userId), r.product, r.rating)
  }

  // ... train model

  // ... get back to MyRating userId from Int

  val someUserId: String = reverseMapping.lookup(123).head

Вы также можете попробовать «data.zipWithUniqueId()», но я не уверен, что в этом случае .toInt будет безопасным преобразованием, даже если размер набора данных невелик.

person Eugene Zhulenev    schedule 05.01.2015
comment
Разве это не присваивает уникальный индекс каждому из рейтингов, а не каждому из пользователей? Я не думаю, что это сработает, если у пользователя несколько оценок. - person PBJ; 22.01.2015
comment
lookup подход не является допустимым кодом Spark. Он будет скомпилирован, но сработает во время выполнения. Не могли бы вы исправить (удалить) это? - person zero323; 07.04.2016

Вам нужно запустить StringIndexer для ваших идентификаторов пользователей, чтобы преобразовать строку в уникальный целочисленный индекс. Они не должны быть непрерывными.

Мы используем это для нашего механизма рекомендаций товаров в https://www.aihello.com.

df is (пользователь: строка, продукт, рейтинг)

  val stringindexer = new StringIndexer()
      .setInputCol("user")
      .setOutputCol("userNumber")
  val modelc = stringindexer.fit(df)
  val  df = modelc.transform(df)
person Ganesh Krishnan    schedule 28.01.2017

@ Ганеш Кришнан прав, StringIndexer решает эту проблему.

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
>>> spark = SQLContext(sc)                                                                             
>>> df = spark.createDataFrame(
...     [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
...     ["id", "category"])

| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = stringIndexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> converted = converter.transform(indexed)
>>> converted.show()
+---+--------+-------------+----------------+
| id|category|categoryIndex|originalCategory|
+---+--------+-------------+----------------+
|  0|       a|          0.0|               a|
|  1|       b|          2.0|               b|
|  2|       c|          1.0|               c|
|  3|       a|          0.0|               a|
|  4|       a|          0.0|               a|
|  5|       c|          1.0|               c|
+---+--------+-------------+----------------+

>>> converted.select("id", "originalCategory").show()
+---+----------------+
| id|originalCategory|
+---+----------------+
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|
+---+----------------+
person maroon912    schedule 11.05.2017

Приведенное выше решение может не всегда работать, как я обнаружил. Spark не может выполнять преобразования RDD из других RDD. Вывод ошибки:

org.apache.spark.SparkException: преобразования и действия RDD могут быть введены только в код, вызываемый здесь драйвером, а не внутри других преобразований; например, rdd1.map(x => rdd2.values.count() * x) недействителен, поскольку преобразование значений и действие подсчета не могут быть выполнены внутри преобразования rdd1.map. Дополнительные сведения см. в SPARK-5063.

В качестве решения вы можете соединить RDD userIdToInt с RDD исходных данных, чтобы сохранить отношение между userId и uniqueId. Затем позже вы можете снова соединить результаты RDD с этим RDD.

// Create RDD with the unique id included
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] = 
    data.keyBy(_.userId).join(userIdToInt).map(r => 
        (r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1))
person tvgriek    schedule 13.07.2015