Прямого способа сделать это в pyspark нет. Что я могу придумать, так это использовать операцию scala для выполнения этого преобразования.
# Crating a random dataframe to check the pipleline
from pyspark.sql.types import StructType,StructField, DoubleType
data2 = [(1.0,2.0),
(3.0,4.0),
]
schema = StructType([ \
StructField("A",DoubleType(),True), \
StructField("B",DoubleType(),True)\
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
df.registerTempTable("temp_table")
>>
df:pyspark.sql.dataframe.DataFrame = [A: double, B: double]
root
|-- A: double (nullable = true)
|-- B: double (nullable = true)
+---+---+
|A |B |
+---+---+
|1.0|2.0|
|3.0|4.0|
+---+---+
>>
# using scala breeze operation to get the inverse
%scala
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv
val featuresDF = table("temp_table")
var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns
for(i <- features.indices){
FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
}
val desnseMat = DenseMatrix(FeatureArray: _*).t
val inverse = inv(desnseMat)
println(inverse)
val c = inverse.toArray.toSeq
val matrix = c.toDF("mat")
matrix.createOrReplaceTempView("matrix_df")
>>
featuresDF:org.apache.spark.sql.DataFrame = [A: double, B: double]
matrix:org.apache.spark.sql.DataFrame = [mat: double]
-1.9999999999999996 0.9999999999999998
1.4999999999999998 -0.4999999999999999
import scala.util.Random
import breeze.linalg.DenseMatrix
import breeze.linalg.inv
featuresDF: org.apache.spark.sql.DataFrame = [A: double, B: double]
FeatureArray: Array[Array[Double]] = Array(Array(1.0, 3.0), Array(2.0, 4.0))
features: Array[String] = Array(A, B)
desnseMat: breeze.linalg.DenseMatrix[Double] =
1.0 2.0
3.0 4.0
inverse: breeze.linalg.DenseMatrix[Double] =
-1.9999999999999996 0.9999999999999998
1.4999999999999998 -0.4999999999999999
c: Seq[Double] = WrappedArray(-1.9999999999999996, 1.4999999999999998, 0.9999999999999998, -0.4999999999999999)
matrix: org.apache.spark.sql.DataFrame = [mat: double]
>>
# collecting inverse flattened matrix from scala operation in pyspark
import numpy as np
matrix_df=spark.sql('''SELECT * FROM matrix_df ''')
df = matrix_df
df.show()
# converting long form to wide form.
from pyspark.sql import functions as F, Window
from math import sqrt
c = int(sqrt(df.count())) #this gives 3
rnum = F.row_number().over(Window.orderBy(F.lit(1)))
out = (df.withColumn("Rnum",((rnum-1)%c).cast("Integer"))
.withColumn("idx",F.row_number().over(Window.partitionBy("Rnum").orderBy("Rnum")))
.groupby("Rnum").pivot("idx").agg(F.first("mat")))
out.show()
>>
out:pyspark.sql.dataframe.DataFrame = [Rnum: integer, 1: double ... 1 more fields]
+----+-------------------+------------------+
|Rnum| 1| 2|
+----+-------------------+------------------+
| 0|-1.9999999999999996|1.4999999999999998|
| 1|-0.4999999999999999|0.9999999999999998|
+----+-------------------+------------------+
>>
person
Vishal Anand
schedule
29.03.2021