Неожиданные результаты при создании диктов и списков RDD в pyspark

Ниже приведен простой сценарий pyspark, который пытается разбить RDD на словарь, содержащий несколько RDD.

Как показывает выполнение примера, сценарий работает только в том случае, если мы выполняем collect() на промежуточных RDD по мере их создания. Конечно, я бы не хотел делать это на практике, так как это не масштабируется.

Что действительно странно, так это то, что я не присваиваю промежуточные collect() результаты какой-либо переменной. Таким образом, разница в поведении обусловлена ​​исключительно скрытым побочным эффектом вычислений, вызванных вызовом collect().

Предполагается, что Spark — очень функциональная среда с минимальными побочными эффектами. Почему получить желаемое поведение можно только запустив какой-то загадочный побочный эффект с помощью collect()?

Ниже показан запуск со Spark 1.5.2, Python 2.7.10 и IPython 4.0.0.

spark_script.py

from pprint import PrettyPrinter
pp = PrettyPrinter(indent=4).pprint
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
    d = dict()
    for key_value in key_values:
        d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
        if collect_in_loop:
            d[key_value].collect()
    return d
def print_results(d):
    for k in d:
        print k
        pp(d[k].collect())    

rdd = sc.parallelize([
    {'color':'red','size':3},
    {'color':'red', 'size':7},
    {'color':'red', 'size':8},    
    {'color':'red', 'size':10},
    {'color':'green', 'size':9},
    {'color':'green', 'size':5},
    {'color':'green', 'size':50},    
    {'color':'blue', 'size':4},
    {'color':'purple', 'size':6}])
key_field = 'color'
key_values = ['red', 'green', 'blue', 'purple']

print '### run WITH collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
print_results(d)
print '### run WITHOUT collect in loop: '
d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
print_results(d)

Пример запуска в оболочке IPython

In [1]: execfile('spark_script.py')
### run WITH collect in loop: 
blue
[{   'color': 'blue', 'size': 4}]
purple
[{   'color': 'purple', 'size': 6}]
green
[   {   'color': 'green', 'size': 9},
    {   'color': 'green', 'size': 5},
    {   'color': 'green', 'size': 50}]
red
[   {   'color': 'red', 'size': 3},
    {   'color': 'red', 'size': 7},
    {   'color': 'red', 'size': 8},
    {   'color': 'red', 'size': 10}]
### run WITHOUT collect in loop: 
blue
[{   'color': 'purple', 'size': 6}]
purple
[{   'color': 'purple', 'size': 6}]
green
[{   'color': 'purple', 'size': 6}]
red
[{   'color': 'purple', 'size': 6}]

person Paul    schedule 14.01.2016    source источник


Ответы (1)


Короткий ответ

Как оказалось, это не столько проблема Spark, сколько сложная функция Python, называемая замыкания с поздним связыванием. Быстрый способ принудительного раннего связывания (в данном случае желаемое поведение) заключается в добавлении аргумента по умолчанию:

lambda row, key_value=key_value: row[key_field] == key_value

Другой способ — с functools.partial.

Длинный ответ

Когда функция определена в Python, любые параметры, поступающие извне функции, извлекаются из определяющей среды (лексическая область видимости), и это делается при оценке функции, не тогда, когда он определен (позднее связывание). Таким образом, в лямбда-функции, используемой преобразованием фильтра, значение key_value не определяется до тех пор, пока функция не будет оценена.

Вы можете начать видеть опасность здесь: key_value принимает несколько значений в цикле split_RDDs_by_key(). Что, если при оценке lambda key_value уже не будет иметь нужного нам значения? Функции часто оцениваются спустя долгое время после их определения, особенно при работе с RDD. Из-за ленивой вычислительной семантики RDD лямбда-выражение не будет оцениваться до тех пор, пока не будет вызвано действие для извлечения данных, например collect() или take().

В split_RDD_by_key() мы перебираем key_values и создаем новый RDD для каждого значения. Когда collect_in_loop=False, нет collect() до тех пор, пока не завершится выполнение split_RDD_by_key(). К тому времени внутренний цикл завершается, и key_value теперь имеет значение «фиолетовый» из последней итерации цикла. Когда оцениваются все лямбда-выражения во всех СДР из split_RDD_by_key(), все они устанавливают для key_value значение «фиолетовый» и извлекают «фиолетовые» строки СДР.

Когда collect_in_loop=True, мы делаем collect() на каждой итерации, в результате чего лямбда оценивается в той же итерации, где она была определена, и мы получаем ожидаемое key_value.

Этот пример на самом деле раскрывает интересную, тонкую деталь о замыканиях Python. Когда collect() в цикле запускает оценку лямбда, лямбда связывает значение. Но что делает лямбда при оценке с более поздними операторами collect(), когда key_value изменилось (в определяющей среде) по сравнению с тем, что было при первой оценке лямбда? В этом примере показано, что все оценки закрытия функции основаны на привязке из первой оценки. "Вызов означает закрытие раз и навсегда".

person Paul    schedule 14.01.2016