Сохраните Spark org.apache.spark.mllib.linalg.Matrix в файл.

Результатом корреляции в Spark MLLib является тип org.apache.spark.mllib.linalg.Matrix. (см. http://spark.apache.org/docs/1.2.1/mllib-statistics.html#correlations)

val data: RDD[Vector] = ... 

val correlMatrix: Matrix = Statistics.corr(data, "pearson")

Я хотел бы сохранить результат в файл. Как я могу это сделать?


person florins    schedule 15.04.2015    source источник


Ответы (4)


Вот простой и эффективный способ сохранить матрицу в hdfs и указать разделитель.

(Транспонирование используется, поскольку .toArray имеет основной формат столбца.)

val localMatrix: List[Array[Double]] = correlMatrix
    .transpose  // Transpose since .toArray is column major
    .toArray
    .grouped(correlMatrix.numCols)
    .toList

val lines: List[String] = localMatrix
    .map(line => line.mkString(" "))

sc.parallelize(lines)
    .repartition(1)
    .saveAsTextFile("hdfs:///home/user/spark/correlMatrix.txt")
person Dylan Hogg    schedule 09.07.2015

Как Matrix является сериализуемым, вы можете написать его, используя обычный Scala.

Пример можно найти здесь.

person Carlos Vilchez    schedule 15.04.2015
comment
Спасибо за ответ, Карлос. Я хотел бы сохранить матрицу в HDFS. Также, по возможности, в удобочитаемом формате. Что-то типа. saveAsTextFile, предоставляемый RDD API - person florins; 16.04.2015
comment
Вы можете попробовать data.saveAsTextFile("hdfs://..."). Я видел это в веб-примерах Spark. - person Carlos Vilchez; 16.04.2015

Ответ Дилана Хогга был великолепен, чтобы немного улучшить его, добавьте индекс столбца. (В моем случае использования, когда я создал файл и загрузил его, он не был отсортирован из-за характера параллельного процесса и т. д.)

ссылка: https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch10s12.html

замените этой строкой, и она поместит порядковый номер в строку (начиная с 0), что упростит сортировку, когда вы перейдете к ее просмотру.

val lines: List[String] = localMatrix 
  .map(line => line.mkString(" ")) 
  .zipWithIndex.map { case(line, count) => s"$count $line" } 
person jduff1075    schedule 04.02.2016

Спасибо за ваше предложение. Я вышел с этим решением. Спасибо Игнасио за его предложения

val vtsd = sd.map(x => Vectors.dense(x.toArray))
val corrMat = Statistics.corr(vtsd)
val arrayCor = corrMat.toArray.toList
val colLen = columnHeader.size
val toArr2 = sc.parallelize(arrayCor).zipWithIndex().map(
      x => {
    if ((x._2 + 1) % colLen == 0) {
      (x._2, arrayCor.slice(x._2.toInt + 1 - colLen, x._2.toInt + 1).mkString(";"))
    } else {
      (x._2, "")
    }
  }).filter(_._2.nonEmpty).sortBy(x => x._1, true, 1).map(x => x._2)


toArr2.coalesce(1, true).saveAsTextFile("/home/user/spark/cor_" + System.currentTimeMillis())
person florins    schedule 16.04.2015