Какова реализация pyspark функции np.linalg.inv ()

Я новичок в Pyspark. Я пытаюсь преобразовать функцию pandas в pyspark. В функции есть инверсия матрицы, но мне трудно получить инверсию с помощью pyspark. У меня есть матрица во фрейме данных pyspark. Как мне реализовать эту инверсию в pyspark? Вот ссылка на метод numpy

np.linalg.inv()

https://numpy.org/doc/stable/reference/generated/numpy.linalg.inv.html


person Sagar Bhattacharys    schedule 23.03.2021    source источник


Ответы (1)


Прямого способа сделать это в pyspark нет. Что я могу придумать, так это использовать операцию scala для выполнения этого преобразования.

# Crating a random dataframe to check the pipleline

from pyspark.sql.types import StructType,StructField, DoubleType
data2 = [(1.0,2.0),
    (3.0,4.0),
  ]

schema = StructType([ \
    StructField("A",DoubleType(),True), \
    StructField("B",DoubleType(),True)\
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
df.registerTempTable("temp_table")

>>
df:pyspark.sql.dataframe.DataFrame = [A: double, B: double]
root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)

+---+---+
|A  |B  |
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+
>>

# using scala breeze operation to get the inverse
%scala
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv

val featuresDF = table("temp_table")

var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns

for(i <- features.indices){
    FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
}

val desnseMat = DenseMatrix(FeatureArray: _*).t
val inverse = inv(desnseMat)
println(inverse)

val c = inverse.toArray.toSeq
val matrix = c.toDF("mat")
matrix.createOrReplaceTempView("matrix_df")

>>
featuresDF:org.apache.spark.sql.DataFrame = [A: double, B: double]
matrix:org.apache.spark.sql.DataFrame = [mat: double]
-1.9999999999999996  0.9999999999999998   
1.4999999999999998   -0.4999999999999999  
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv
featuresDF: org.apache.spark.sql.DataFrame = [A: double, B: double]
FeatureArray: Array[Array[Double]] = Array(Array(1.0, 3.0), Array(2.0, 4.0))
features: Array[String] = Array(A, B)
desnseMat: breeze.linalg.DenseMatrix[Double] =
1.0  2.0
3.0  4.0
inverse: breeze.linalg.DenseMatrix[Double] =
-1.9999999999999996  0.9999999999999998
1.4999999999999998   -0.4999999999999999
c: Seq[Double] = WrappedArray(-1.9999999999999996, 1.4999999999999998, 0.9999999999999998, -0.4999999999999999)
matrix: org.apache.spark.sql.DataFrame = [mat: double]
>>


# collecting inverse flattened matrix from scala operation in pyspark
import numpy as np
matrix_df=spark.sql('''SELECT * FROM matrix_df  ''')
df = matrix_df
df.show()


# converting long form to wide form.

from pyspark.sql import functions as F, Window
from math import sqrt
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))

out = (df.withColumn("Rnum",((rnum-1)%c).cast("Integer"))
 .withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("mat")))
out.show()

>>
out:pyspark.sql.dataframe.DataFrame = [Rnum: integer, 1: double ... 1 more fields]
+----+-------------------+------------------+
|Rnum|                  1|                 2|
+----+-------------------+------------------+
|   0|-1.9999999999999996|1.4999999999999998|
|   1|-0.4999999999999999|0.9999999999999998|
+----+-------------------+------------------+
>>
person Vishal Anand    schedule 29.03.2021