Я создаю систему рекомендаций для фильмов, используя наборы данных MovieLens, доступные здесь: http://grouplens.org/datasets/movielens/
Чтобы вычислить эту систему рекомендаций, я использую библиотеку ML Flink в scala и, в частности, алгоритм ALS (org.apache.flink.ml.recommendation.ALS
).
Сначала я сопоставляю рейтинги фильма с DataSet[(Int, Int, Double)]
, а затем создаю trainingSet
и testSet
(см. код ниже).
Моя проблема в том, что нет ошибки, когда я использую функцию ALS.fit
со всем набором данных (все рейтинги), но если я просто удалю только один рейтинг, функция подгонки больше не работает, и я не понимаю Зачем.
У Вас есть какие-либо идеи? :)
Используемый код:
Рейтинг.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
"Но если я удалю только одну оценку"
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
Ошибка:
19.06.2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) переведена в состояние FAILED
java.lang.ArrayIndexOutOfBoundsException: 5
на org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)
на org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)
в org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)
...