Почему PySpark dropDuplicates and Join дает НЕЧЕТНЫЕ результаты

PySpark дает мне немного странные результаты после dropDuplicates и объединения наборов данных. Ситуация такова, что есть два очень больших набора данных: один с идентификатором людей и некоторыми переменными, а второй с их кодом региона.

первый набор данных:

ID|VAR1|VAR2|VAR3|VAR4|VAR5|
1|-----|----|---|---|----|
2|-----|----|---|---|----|
3|-----|----|---|---|----|
4|-----|----|---|---|----|

второй набор данных:

ID|region_code|
1|7|
2|5|
1|9|
4|7|

результат, который я получаю после следующего кода:

file_1 = file_1.dropDuplicates(["ID"])
file_2 = file_2.dropDuplicate(["ID"])
file_2.filter(filter("ID == '1'").show()

ID|region_code|
1|7|

После присоединения файлов я ожидаю:

merge_file = file_1.join(file_2, "ID", "left")

ID|VAR1|VAR2|VAR3|VAR4|VAR5|region_code|
1|-----|----|---|---|----|7|
2|-----|----|---|---|----|5|
3|-----|----|---|---|----|null|
4|-----|----|---|---|----|7|

но я получил:

merge_file.filter("ID == '1'").show()

ID|VAR1|VAR2|VAR3|VAR4|VAR5|region_code|
1|-----|----|---|---|----|9|

Мне очень любопытны эти странные результаты.


person default_settings    schedule 31.05.2018    source источник
comment
Я не уверен, что dropDuplicates это та функция, которую вы ищете здесь. В вашем примере ID == 1 имеет два связанных региона. Удаление дубликата ID приведет к произвольному выбору региона для каждого ID, а затем к удалению остальных, что не является надежным поведением.   -  person vielkind    schedule 31.05.2018
comment
Ваше предложение использовать file_2.dropDuplicates(["ID", "region_code"]) ? и почему первый примененный фильтр к файлу_2 дает после того, как dropDuplicates дает ID|region_code| 1|7|   -  person default_settings    schedule 31.05.2018


Ответы (1)


pyspark — ленивый интерпретатор. Ваш код выполняется только тогда, когда вы вызываете действие (например, show(), count() и т. д.). В вашем примере кода вы создаете file_2. Вместо того, чтобы думать о file_2 как об объекте, живущем в памяти, file_2 на самом деле представляет собой просто набор инструкций, которые сообщают механизму pyspark шаги обработки. Когда вы вызываете file_2.filter(filter("ID == '1'").show(), эти инструкции выполняются (включая dropDuplicates()) для создания вывода.

Когда вы создаете merge_file, вы ссылаетесь на этапы обработки для file_2, которые могут оцениваться иначе, чем в предыдущем примере. Нет никакой гарантии, что dropDuplicates() каждый раз будет отбрасывать одни и те же строки из-за ленивого выполнения pyspark. Вот почему вы получаете разные результаты между двумя кадрами данных.

Именно поэтому я предлагаю вам подумать о своих данных и о том, чего вы хотите добиться с помощью merge, потому что dropDuplicates() не является надежным методом, если важны отношения между ID и region.

person vielkind    schedule 31.05.2018
comment
Спасибо @vealkind за объяснение, какую функцию для удаления повторяющихся записей из файла_2 вы бы порекомендовали? Я хочу получить только одну запись для каждого идентификатора в файле_2. - person default_settings; 31.05.2018
comment
Вы можете сделать следующее: file_2.groupby(func.col("ID")).agg(func.collect_set(func.col("region_code")).alias("arr_region")), который создаст одну запись для каждого ID, которая будет содержать массив всех отдельных region_name, связанных с ID, а затем выполните merge. - person vielkind; 31.05.2018