Как обнаружить дубликаты в большом файле json с помощью PySpark HashPartitioner

У меня есть большой файл json с более чем 20 ГБ метаданных в формате json. Он содержит простые пользовательские метаданные в каком-то приложении, и я хотел бы просмотреть их, чтобы обнаружить дубликаты. Вот пример того, как выглядят данные:

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

Файл json содержит построчно объекты json, которые очень похожи на этот. Дубликат возникает, когда поле "name" двух объектов json совпадает. Итак, это дубликат:

{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

Ровно столько же, сколько два одинаковых объекта json.

Теперь я хочу просмотреть весь файл json, который слишком велик для того, чтобы поместиться в память, и, используя лучший стандарт, выяснить все дубликаты и то, что они являются дубликатами, а затем сделать некоторые логика - логическая часть тривиальна, но я несколько не уверен, как найти дубликаты.

О чем я думал:

  1. Первое, что я решил использовать, это фильтр Блума. Они не такие запутанные и работают довольно хорошо и быстро, и я думаю, что они по существу сводятся к O (n). Однако фильтры Блума не дадут мне знать, что является дубликатом строки-дубликата, что для меня недопустимо.

  2. Я думал об использовании внешней сортировки слиянием. Я бы в основном разделил файл на несколько файлов меньшего размера, которые поместились бы в памяти, отсортировал бы каждый фрагмент и искал дубликаты (которые теперь сгруппированы вместе). Но я не очень уверен, что эта реализация - то, что я хочу.

  3. Следующим, с чем я столкнулся, было хеширование по разделам, и я подозреваю, что это то, что я хочу. Хеширование — это, по сути, лучший способ найти дубликаты при работе с данными, которые помещаются в память, так почему бы не использовать его для чего-то, что не помещается? Я немного смущен тем, как хэшировать по разделам. Я не уверен, что это то, что я ищу.

Итак, я думаю, что должен использовать вариант 3, хэширование по разделам, и я знаю, что в Spark он есть. Я надеялся, что кто-нибудь может дать мне знать, на правильном ли я пути, и, возможно, дать мне некоторые инструкции о том, прав ли я или нет. У меня есть пара конкретных вопросов, концептуально:

  1. Допустим, я создаю 100 разделов, которые идеально вписываются в память (так что в моем случае каждый раздел будет иметь размер 100 МБ). Допустим, я хеширую первые x элемента в моем json-файле в один раздел и не нахожу дубликатов нет. Допустим, у меня есть другой раздел со вторыми 100 МБ данных, которые также не содержат дубликатов. Если я могу загружать только 100 МБ данных за раз, как мне проверить, что раздел 1 и раздел 2 не имеют дубликатов друг от друга? Чтобы уточнить, если в разделе 1 есть элемент, а в разделе 2 есть один и тот же элемент, как мне это понять? Я предполагаю, что мне нужно будет загрузить оба в память, верно? А если я не могу... то что мне делать? Может я неправильно понимаю...

  2. Что приводит к моему второму вопросу - кажется, что разбиение на разделы работает не так, и когда вы хешируете по разделам, элементы с похожим хэшем или диапазоном хэшей попадают в определенный файл. Поэтому, если два элемента являются дубликатами, я бы знал, потому что алгоритм попытается поместить их в файл, где хэш уже существует. Так ли это?

Я знаю, что у меня есть еще вопросы, я просто не могу их придумать. У кого-нибудь есть советы? Особенно в отношении pyspark и как лучше всего это использовать? Или pyspark не то, что я ищу?


person John Lexus    schedule 25.04.2019    source источник
comment
Вы можете группировать по имени и агрегировать с помощью collect_list. Затем напишите UDF с вашей логикой, чтобы обработать список и сделать все, что вам нужно.   -  person Hitobat    schedule 28.04.2019
comment
@Hitobat, можешь объяснить подробнее? Я новичок в писпарке. Я посмотрю эти термины, хотя.   -  person John Lexus    schedule 28.04.2019
comment
@JohnLexus будьте осторожны при использовании groupBy. Это широкое преобразование, которое может иметь проблемы с производительностью. Определение из Databricks: databricks.com/glossary/what-are-transformations.   -  person VanBantam    schedule 01.05.2019


Ответы (1)


Проблема проще, чем вы думаете. Вам действительно нужно только агрегировать данные по name, как предлагает @Hitobat. Я бы решил проблему с pyspark.sql.Window, чтобы упростить вывод агрегации.

Учитывая следующие данные, это файл с именем data.json (это также может быть каталог файлов, а не один файл)

Содержание data.json

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}

Тогда код pyspark будет выглядеть так:

from pyspark.sql import Window
from pyspark.sql import functions as F

df = spark.read.json("data.json") # can be a directory of files as well 
df.show()

Вывод

+----------+----------+---------------+--------+----------------+
|   created|created_at|           name|    type|        username|
+----------+----------+---------------+--------+----------------+
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|
|2016-01-19|2012-05-25|  arthurking231|    null|            null|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|
+----------+----------+---------------+--------+----------------+ 

Затем разделите и посчитайте с помощью pyspark.sql.Window

name_partition_window = Window.partitionBy("name")
df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
df_with_repeat_counts.show()

Вывод

+----------+----------+---------------+--------+----------------+-----------+
|   created|created_at|           name|    type|        username|name_counts|
+----------+----------+---------------+--------+----------------+-----------+
|2016-01-19|2012-05-25|  arthurking231|    null|            null|          1|
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|          1|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|          2|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|          1|
+----------+----------+---------------+--------+----------------+-----------+

Затем отфильтруйте кадр данных в столбце name_count и упорядочите по имени для проверки.

duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
duplicates.show()

Вывод

+----------+----------+---------------+--------+--------+-----------+
|   created|created_at|           name|    type|username|name_counts|
+----------+----------+---------------+--------+--------+-----------+
|2015-11-08|2010-01-19|Assasinator5827|    null|    null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|    null|          2|
+----------+----------+---------------+--------+--------+-----------+

На этом этапе вы можете проанализировать кадр данных duplicates по мере необходимости для вашего варианта использования.

person johnmdonich    schedule 30.04.2019