OutOfBoundsException с ALS — Flink MLlib

Я создаю систему рекомендаций для фильмов, используя наборы данных MovieLens, доступные здесь: http://grouplens.org/datasets/movielens/

Чтобы вычислить эту систему рекомендаций, я использую библиотеку ML Flink в scala и, в частности, алгоритм ALS (org.apache.flink.ml.recommendation.ALS).

Сначала я сопоставляю рейтинги фильма с DataSet[(Int, Int, Double)], а затем создаю trainingSet и testSet (см. код ниже).

Моя проблема в том, что нет ошибки, когда я использую функцию ALS.fit со всем набором данных (все рейтинги), но если я просто удалю только один рейтинг, функция подгонки больше не работает, и я не понимаю Зачем.

У Вас есть какие-либо идеи? :)

Используемый код:

Рейтинг.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

"Но если я удалю только одну оценку"

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

Ошибка:

19.06.2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) переведена в состояние FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

на org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

на org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)

в org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...


person Kerial    schedule 19.06.2015    source источник


Ответы (1)


Проблема заключается в операторе first в сочетании с параметром setTemporaryPath реализации Flink ALS. Чтобы понять проблему, позвольте мне быстро объяснить, как работает блокирующий алгоритм ALS.

Блокирующая реализация чередующегося метода наименьших квадратов сначала разбивает заданную матрицу рейтингов по пользователям и по элементам на блоки. Для этих блоков рассчитывается маршрутная информация. Эта информация о маршрутизации говорит о том, какой блок пользователя/элемента получает какие входные данные от какого элемента/блока пользователя, соответственно. После этого запускается итерация ALS.

Поскольку основной механизм выполнения Flink представляет собой механизм параллельного потокового потока данных, он пытается выполнить как можно больше частей потока данных конвейерным способом. Для этого необходимо, чтобы все операторы конвейера были подключены к сети одновременно. Это имеет то преимущество, что Flink избегает материализации промежуточных результатов, которые могут быть чрезмерно большими. Недостатком является то, что доступная память должна быть разделена между всеми работающими операторами. В случае ALS, где размер отдельных элементов DataSet (например, блоков пользователя/элемента) довольно велик, это нежелательно.

Чтобы решить эту проблему, не все операторы реализации выполняются одновременно, если вы установили temporaryPath. Путь определяет, где могут храниться промежуточные результаты. Таким образом, если вы определили временный путь, то ALS сначала вычисляет информацию о маршрутизации для пользовательских блоков и записывает их на диск, затем вычисляет информацию о маршрутизации для блоков элементов и записывает их на диск и, что не менее важно, запускается Итерация ALS, для которой она считывает информацию о маршрутизации из временного пути.

Расчет маршрутной информации для блоков пользователя и элементов зависит от заданного набора данных рейтингов. В вашем случае, когда вы вычисляете информацию о маршрутизации пользователя, она сначала прочитает набор данных рейтингов и применит к нему оператор first. Оператор first возвращает n произвольные элементы из базового набора данных. Проблема сейчас в том, что Flink не сохраняет результат этой операции first для расчета информации о маршрутизации товара. Вместо этого, когда вы начнете вычисление информации о маршрутизации элемента, Flink повторно выполнит поток данных, начиная с его источников. Это означает, что он считывает набор данных рейтингов с диска и снова применяет к ним оператор first. Это даст вам во многих случаях другой набор оценок по сравнению с результатом первой операции first. Таким образом, сгенерированная информация о маршрутизации несовместима, и ALS завершается ошибкой.

Вы можете обойти проблему, материализовав результат оператора first и используя этот результат в качестве входных данных для алгоритма ALS. Объект FlinkMLTools содержит метод persist, который принимает DataSet, записывает его по заданному пути, а затем возвращает новый DataSet, который считывает только что записанное DataSet. Это позволяет разбить полученный граф потока данных.

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

В качестве альтернативы вы можете попробовать оставить temporaryPath неустановленным. Затем все этапы (вычисление информации о маршрутизации и другие итерации) выполняются в конвейерном режиме. Это означает, что и для расчета маршрутной информации пользователя, и для элемента используется один и тот же набор входных данных, полученный в результате оператора first.

Сообщество Flink в настоящее время работает над сохранением в памяти промежуточных результатов операторов. Это позволит закрепить результат оператора first, чтобы он не вычислялся дважды и, таким образом, не давал различающихся результатов из-за своей недетерминированности.

person Till Rohrmann    schedule 19.06.2015
comment
Благодарю вас! Работает отлично! Просто вопрос: когда вы используете persist, вы указываете путь к файлу. Но откуда ALS знать, что этот постоянный файл предназначен для него, а не для другой части программы (например, если нам нужно persist для другого алгоритма)? - person Kerial; 22.06.2015
comment
Вызов функции persist запускает выполнение потока данных до этой точки. В результате любой файл, хранящийся по тому же пути, будет перезаписан. Результат будет прочитан только после запуска следующей части задания. Это означает, что теоретически вы можете тем временем удалить или перезаписать этот файл. Поэтому вы должны попытаться присвоить уникальные имена файлам в рамках вашего алгоритма. Кроме того, вы также можете создавать случайные имена файлов, которые должны иметь низкую вероятность столкновения. - person Till Rohrmann; 24.06.2015