Какой самый простой способ получить Spark DataFrame из произвольных данных массива в Scala?

Уже пару дней ломаю голову над этим. Такое ощущение, что это должно быть интуитивно просто... Очень надеюсь, что кто-то может помочь!

Я построил org.nd4j.linalg.api.ndarray.INDArray вхождения слова из некоторых полуструктурированных данных, например:

import org.nd4j.linalg.factory.Nd4j
import org.nd4s.Implicits._

val docMap = collection.mutable.Map[Int,Map[Int,Int]] //of the form Map(phrase -> Map(phrasePosition -> word)
val words = ArrayBuffer("word_1","word_2","word_3",..."word_n")
val windows = ArrayBuffer("$phrase,$phrasePosition_1","$phrase,$phrasePosition_2",..."$phrase,$phrasePosition_n") 

var matrix = Nd4j.create(windows.length*words.length).reshape(windows.length,words.length)
for (row <- matrix.shape(0)){
    for(column <- matrix.shape(1){
        //+1 to (row,column) if word occurs at phrase, phrasePosition indicated by window_n.
    }
}
val finalmatrix = matrix.T.dot(matrix) // to get co-occurrence matrix

Все идет нормально...

После этого мне нужно интегрировать данные в существующий конвейер в Spark и использовать эту реализацию pca и т. д., поэтому мне нужно создать DataFrame или, по крайней мере, RDD. Если бы я заранее знал количество слов и/или окон, я мог бы сделать что-то вроде:

case class Row(window : String, word_1 : Double, word_2 : Double, ...etc)

val dfSeq = ArrayBuffer[Row]()
for (row <- matrix.shape(0)){
    dfSeq += Row(windows(row),matrix.get(NDArrayIndex.point(row), NDArrayIndex.all()))
}
sc.parallelize(dfSeq).toDF("window","word_1","word_2",...etc)

но количество окон и слов определяется во время выполнения. Я ищу WindowsxWords org.apache.spark.sql.DataFrame в качестве вывода, ввод - это WindowsxWords org.nd4j.linalg.api.ndarray.INDArray

Заранее благодарим за любую помощь, которую вы можете предложить.


person Nahko    schedule 17.04.2019    source источник


Ответы (1)


Итак, после нескольких дней работы кажется, что простой ответ: его нет. На самом деле, похоже, что пытаться использовать Nd4j в этом контексте вообще — плохая идея по нескольким причинам:

  1. (Действительно) сложно получить данные из родного формата INDArray после того, как вы их вставили.
  2. Даже используя что-то вроде guava, метод .data() собирает все в кучу, что быстро станет дорогим.
  3. У вас есть дополнительные проблемы, связанные с необходимостью компилировать сборку jar или использовать hdfs и т. д. для обработки самой библиотеки.

Я также рассматривал возможность использования Breeze, который на самом деле может обеспечить жизнеспособное решение, но имеет некоторые из тех же проблем и не может использоваться в распределенных структурах данных .

К сожалению, использование собственных типов данных Spark / Scala, хотя и проще, если вы знаете, как это сделать, для кого-то вроде меня, пришедшего с небес Python + numpy + pandas, по крайней мере, — болезненно запутанно и уродливо.

Тем не менее, я успешно реализовал это решение:

import org.apache.spark.mllib.linalg.{Vectors,Vector,Matrix,DenseMatrix,DenseVector}
import org.apache.spark.mllib.linalg.distributed.RowMatrix

//first make a pseudo-matrix from Scala Array[Double]:
var rowSeq = Seq.fill(windows.length)(Array.fill(words.length)(0d))

//iterate through 'rows' and 'columns' to fill it:
for (row 0 until windows.length){
    for (column 0 until words.length){
        // rowSeq(row)(column) += 1 if word occurs at phrase, phrasePosition indicated by window_n.
    }
}

//create Spark DenseMatrix
val rows : Array[Double] = rowSeq.transpose.flatten.toArray
val matrix = new DenseMatrix(windows.length,words.length,rows)

Одной из основных операций, для которой мне понадобился Nd4J, была matrix.T.dot(matrix), но оказалось, что вы не можете перемножить 2 матрицы типа org.apache.spark.mllib.linalg.DenseMatrix вместе, одна из них (A) должна быть org.apache.spark.mllib.linalg.distributed.RowMatrix и, как вы уже догадались, вы не можете звоните matrix.transpose() на RowMatrix, только на DenseMatrix! Поскольку это на самом деле не имеет отношения к вопросу, я пропущу эту часть, за исключением того, что поясню, что результатом этого шага является RowMatrix. Кредит также следует здесь и здесь для заключительной части решения :

val rowMatrix : [RowMatrix] = transposeAndDotDenseMatrix(matrix)

// get DataFrame from RowMatrix via DenseMatrix
val newdense = new DenseMatrix(rowMatrix.numRows().toInt,rowMatrix.numCols().toInt,rowMatrix.rows.collect.flatMap(x => x.toArray)) // the call to collect() here is undesirable...
val matrixRows = newdense.rowIter.toSeq.map(_.toArray)
val df = spark.sparkContext.parallelize(matrixRows).toDF("Rows")

// then separate columns:
val df2 = (0 until words.length).foldLeft(df)((df, num) => 
df.withColumn(words(num), $"Rows".getItem(num)))
.drop("Rows")

Хотелось бы услышать улучшения и предложения по этому поводу, спасибо.

person Nahko    schedule 20.04.2019