Найдите похожие строки, присутствующие в столбце DataFrame, без использования цикла for в PySpark

У меня есть DataFrame, содержащий столбец со строками. Я хочу найти похожие строки и пометить их флажком. Я использую функцию ratio из модуля python-Levenshtein и хотите пометить строки с коэффициентом выше 0,90 как «похожие». Ниже приведен пример имеющегося у меня DataFrame:

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat"),
    (3, "Logistic,regression,model,are,neat")
], ["id", "sentence"])

Желаемый результат:

+---+-----------------------------------+------------+
|id |sentence                           |similar_flag|
+---+-----------------------------------+------------+
|0  |Hi I heard about Spark             |            |
|1  |I wish Java could use case classes |            |
|2  |Logistic regression models are neat|2_0         |
|3  |Logistic regression model is neat  |2_1         |
|4  |Logistics regression model are neat|2_2         |
+---+-----------------------------------+------------+

Где «2_1» означает, что «2» - это «идентификатор» ссылочной строки (первая уникальная строка, используемая для сопоставления), а «1» представляет первую строку, которая соответствует ей. Я хочу полностью избежать циклов for. Для небольших данных я использовал цикл for для достижения желаемого результата в простом питоне и хочу получить такие же результаты в PySpark, поэтому я не хочу использовать какой-либо другой модуль, кроме python-Levenshtein. Я столкнулся с этим, но он требует, чтобы я отказался от модуля python-Levenshtein. Кроме того, мой DataFrame, вероятно, будет огромным (и ожидается, что он будет расти каждый день), поэтому этот подход может вызвать ошибки памяти. Есть ли лучший способ добиться желаемого результата?


person Japun Japun    schedule 16.11.2019    source источник


Ответы (1)


Я бы ответил в три шага. Во-первых, вам нужно разрешить df просматривать все параметры, поэтому вам может потребоваться картезианское произведение ваших данных с использованием crossJoin, например:

from pyspark.sql import functions as f

df_new = (
    sentenceDataFrame.crossJoin(
                         sentenceDataFrame.select(
                             f.col('sentence').alias('second_sentence'),
                             f.col('id').alias('second_id')))
)

Во-вторых, посмотрите pyspark.sql.functions.levehstein. Как только ваши предложения будут расположены друг против друга, добавьте новый столбец с расстоянием Левехштейна, используя

df_new_with_dist = df_new.withColumn('levehstein_distance',
    f.levenshtein(f.col("sentence"), f.col("second_sentence"))
)

df_new_with_dist.show()

+---+--------------------+--------------------+---------+-------------------+
| id|            sentence|     second_sentence|second_id|levehstein_distance|
+---+--------------------+--------------------+---------+-------------------+
|  0|Hi I heard about ...|Hi I heard about ...|        0|                  0|
|  0|Hi I heard about ...|I wish Java could...|        1|                 27|
|  0|Hi I heard about ...|Logistic,regressi...|        2|                 29|
|  0|Hi I heard about ...|Logistic,regressi...|        3|                 28|
|  1|I wish Java could...|Hi I heard about ...|        0|                 27|
|  1|I wish Java could...|I wish Java could...|        1|                  0|
|  1|I wish Java could...|Logistic,regressi...|        2|                 32|
|  1|I wish Java could...|Logistic,regressi...|        3|                 31|
|  2|Logistic,regressi...|Hi I heard about ...|        0|                 29|
|  2|Logistic,regressi...|I wish Java could...|        1|                 32|
|  2|Logistic,regressi...|Logistic,regressi...|        2|                  0|
|  2|Logistic,regressi...|Logistic,regressi...|        3|                  1|
|  3|Logistic,regressi...|Hi I heard about ...|        0|                 28|
|  3|Logistic,regressi...|I wish Java could...|        1|                 31|
|  3|Logistic,regressi...|Logistic,regressi...|        2|                  1|
|  3|Logistic,regressi...|Logistic,regressi...|        3|                  0|
+---+--------------------+--------------------+---------+-------------------+

Наконец, отфильтруйте все строки, где id == second_id. Если вы хотите придерживаться своей нотации, например, 2_1, я рекомендую вам добавить groupBy(f.col("id")) и агрегировать levehstein_distance с f.min(). Затем вы можете объединить свои идентификаторы, например, с

min_dist_df = (
    df_new_with_dist.where(f.col('id') != f.col('second_id'))
                    .groupBy(f.col('id').alias('second_id'))
                    .agg(f.min(f.col('levehstein_distance')).alias('levehstein_distance'))
)


(
    df_new_with_dist.join(min_dist_df,
                          on=['second_id', 'levehstein_distance'],
                          how='inner')
                    .withColumn('similar_flag', f.concat(f.concat(f.col('id'), f.lit('_'), f.col('second_id'))))
                    .select('id', 'sentence', 'similar_flag')
).show()

+---+--------------------+------------+
| id|            sentence|similar_flag|
+---+--------------------+------------+
|  2|Logistic,regressi...|         2_3|
|  1|I wish Java could...|         1_0|
|  0|Hi I heard about ...|         0_1|
|  3|Logistic,regressi...|         3_2|
+---+--------------------+------------+

Хотя это не совсем то, о чем вы просили, вы можете фильтровать и настраивать значения levehstein_distance, чтобы получить желаемый ответ.

person napoleon_borntoparty    schedule 19.11.2019
comment
Спасибо! Это достаточно близкое решение, и crossJoin решает основную проблему избежания циклов for. Я просмотрел документацию по Spark Python API, но не смог найти функцию, эквивалентную соотношению Левенштейна. Расстояние Левенштейна также является аналогичной мерой, но может вызвать регресс (у меня есть предыдущие результаты, обработанные с использованием простого Python, который мне нужно сопоставить). Я обязательно попробую это решение. - person Japun Japun; 22.11.2019