Соедините два конвейера Spark mllib вместе

У меня есть два отдельных DataFrames, каждый из которых имеет несколько различных этапов обработки, для обработки которых я использую mllib трансформаторы в конвейере.

Теперь я хочу объединить эти два конвейера вместе, сохранив функции (столбцы) каждого DataFrame.

Scikit-learn имеет FeatureUnion класс для обработки этого, и я не могу найти эквивалента для mllib.

Я могу добавить настраиваемый этап преобразования в конце одного конвейера, который принимает DataFrame, созданный другим конвейером, в качестве атрибута и присоединяется к нему в методе преобразования, но это кажется беспорядочным.


person Anake    schedule 15.06.2017    source источник
comment
вы ищете присоединение или союз? и то, и другое можно обрабатывать с помощью фреймов данных.   -  person jamborta    schedule 15.06.2017
comment
@jamborta, это было соединение, однако я хотел сделать это как этап конвейера, чтобы у меня была проверка схемы по всему конвейеру   -  person Anake    schedule 27.06.2017


Ответы (1)


Pipeline или PipelineModel действительны PipelineStages и, как таковые, могут быть объединены в один Pipeline. Например с:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

вы можете комбинировать Pipelines:

Pipeline(stages=[
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label|x1 |x2 |x3 |x4 |features1|features2|features         |
+-----+---+---+---+---+---------+---------+-----------------+
|1.0  |0  |1  |1  |0  |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|0.0  |1  |0  |0  |1  |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

или предварительно установлен PipelineModels:

model1 = pipeline1.fit(df)
model2 = pipeline2.fit(df)

Pipeline(stages=[
    model1, model2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label| x1| x2| x3| x4|features1|features2|         features|
+-----+---+---+---+---+---------+---------+-----------------+
|  1.0|  0|  1|  1|  0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|  0.0|  1|  0|  0|  1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

Поэтому подход, который я бы порекомендовал, состоит в том, чтобы заранее объединить данные и fit и transform в целом DataFrame.

Смотрите также:

person zero323    schedule 15.06.2017
comment
Здесь у нас один фрейм данных, но, допустим, если у нас есть два разных фрейма данных с разным количеством строк, как конвейер Spark ml соединяется с ними внутри? Это перекрестное соединение или другое? - person Kaushal; 09.04.2020