Как использовать PySpark UDF в проекте Scala Spark?

Несколько человек (1, 2, 3) обсуждали использование Scala UDF в приложении PySpark, обычно по соображениям производительности. Меня интересует обратное - использование Python UDF в проекте Scala Spark.

Меня особенно интересует создание модели с использованием sklearn (и MLFlow), а затем эффективно применяет это к записям в задании потоковой передачи Spark. Я знаю, что могу также разместить модель Python за REST API и выполнять вызовы этого API в приложение потоковой передачи Spark в _ 1_, но управление параллелизмом для этой задачи и настройка API для размещенной модели - это не то, что меня очень волнует.

Возможно ли это без особой индивидуальной разработки с чем-то вроде Py4J? Это просто плохая идея?

Спасибо!


person turtlemonvh    schedule 18.08.2018    source источник
comment
Это возможно, но определенно не поддерживается и не является простым. Так что вопрос действительно в том, зачем вам вообще пытаться. Найти разумное оправдание такому процессу действительно сложно.   -  person zero323    schedule 19.08.2018
comment
@ user6910411 Спасибо за ответ. Я объяснил вариант использования в вопросе - я хотел бы использовать модель, которую я обучил с помощью sklearn, для оценки отдельных строк в приложении структурированной потоковой передачи.   -  person turtlemonvh    schedule 20.08.2018
comment
Я предполагаю, что вопрос в том, что если вы уже хотите заплатить цену за межъязыковое общение, почему бы не полностью использовать PySpark?   -  person zero323    schedule 20.08.2018
comment
В этом случае, потому что 1) операция python будет небольшой частью более крупного задания Spark, и я бы предпочел не платить штраф PySpark за все это, и 2) у меня уже есть зрелый проект Scala, и я просто хочу добавить немного Python без необходимости перезаписи.   -  person turtlemonvh    schedule 20.08.2018
comment
Не отправлять в качестве ответа, но если вы используете Databricks, вы можете использовать Scala и Python в одной работе. Вы можете перейти в Pyspark для этого UDF с помощью sklearn, оценить записи, а затем немедленно перейти обратно к Scala ниже по потоку. Общий слой - это таблицы Spark в SparkSQL. Pyspark может их читать и писать, как и Spark на Scala (очевидно). Не уверен, как бы вы это сделали с чистым открытым исходным кодом, или поддерживает ли это что-то вроде Zeppelin. (Полное раскрытие, я работаю в Databricks)   -  person Raphael K    schedule 08.05.2019
comment
Спасибо @RaphaelK: мы являемся клиентами Databricks в компании, в которой я работаю, поэтому я знаю об этом варианте, по крайней мере, при работе с ноутбуками.   -  person turtlemonvh    schedule 09.05.2019
comment
Удалось ли вам найти решение этой проблемы? У меня похожая проблема. У меня есть хорошо продуманный проект spark / scala, в котором мне нужно загружать модели, созданные с помощью sklearn или даже pytorch, а затем делать прогнозы на большом наборе данных.   -  person Shirish Kumar    schedule 23.12.2020
comment
@ShirishKumar вроде как. Недавно мне пришлось что-то делать, когда у меня был udf, который выполнял некоторые сложные сетевые операции, и я хотел повторно использовать открытое соединение и добавить кеширование ответов. Я создал глобальный объект и настроил udf для выполнения вызовов функций через глобальный объект. Глобальный управлял соединением, кешированными ответами и т. Д. Если бы мне пришлось решить эту проблему сегодня, я бы применил тот же подход и открыл процесс, на котором запущен python, а затем передавал записи через процесс. Это похоже на то, как udfs python работают в pyspark (все данные проходят через stdin / stdout).   -  person turtlemonvh    schedule 24.12.2020
comment
Некоторые примеры этого странного шаблона: github.com/apache/spark/blob/master/core/src/main/scala/org/, github.com/apache/spark/blob/v3. 0.1 / sql / core / src / main / scala / org /, github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/, github.com/apache/spark/blob/v3.0.1/core/src/main/scala/org/ Последний пункт является основной точкой входа для запуска udfs Python в Spark.   -  person turtlemonvh    schedule 24.12.2020
comment
Подводя итог сказанному выше: посмотрите, как работают udf-файлы на Python, просмотрев PythonRunner. Код - private[spark], но вы все равно можете получить к нему доступ, поместив объект-оболочку внутри org.apache.spark. Внутри этой функции есть много причуд, которые могут вам не понадобиться. Если вам не нужны файлы udf, для вызова вашего кода может быть достаточно простого mapPartitions (развертывание процесса python для каждого раздела).   -  person turtlemonvh    schedule 24.12.2020


Ответы (1)


Может, я опаздываю на вечеринку, но, по крайней мере, я могу помочь с этим для потомков. На самом деле этого можно достичь, создав свой python udf и зарегистрировав его в spark.udf.register("my_python_udf", foo). Вы можете просмотреть этот документ здесь https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.UDFRegistration.register

Затем эту функцию можно вызвать из sqlContext в Python, Scala, Java, R или на любом другом языке, потому что вы обращаетесь к sqlContext напрямую (где udf зарегистрирован). Например, вы бы назвали что-то вроде

spark.sql("SELECT my_python_udf(...)").show()

ПЛЮСЫ - Вы можете вызвать свою sklearn модель из Scala.

МИНУСЫ - Вы должны использовать sqlContext и писать запросы в стиле SQL.

Я надеюсь, что это поможет, по крайней мере, для будущих посетителей.

person napoleon_borntoparty    schedule 25.11.2019
comment
Спасибо за это. Похоже, мы должны иметь возможность отправлять zip-файлы python вместе с основным jar-файлом для искрового задания и использовать эти zip-файлы python в качестве зависимостей. - person turtlemonvh; 26.11.2019
comment
Я думаю, вы говорите из ситуации, когда у вас есть контекст в процессе Python, вы регистрируете UDF, а затем повторно используете контекст в JVM, где вы можете получить к нему доступ. Это было бы возможно в записной книжке Databricks, но не тогда, когда у меня есть одно задание, которое я начинаю с spark-submit. - person Def_Os; 14.12.2020