Практические руководства, РУКОВОДСТВО ПО ИСПОЛНЕНИЮ SPARK

Четыре способа фильтрации набора данных Spark по сравнению с набором значений данных

Фильтрация набора данных Spark по набору значений данных часто встречается во многих потоках аналитики данных. Эта конкретная история объясняет четыре разных способа достижения того же.

Предположим, существует очень большой набор данных «A» со следующей схемой:

root:
| — empId: Integer
| — sal: Integer
| — name: String
| — address: String
| — dept: Integer

Набор данных «A» необходимо отфильтровать по набору идентификаторов сотрудников (empIds), «B» (может транслироваться исполнителям), чтобы получить отфильтрованный набор данных «A». Работа фильтра может быть представлена ​​как:

A` = A.filter(A.empId contains in 'B')

Для реализации этого наиболее распространенного сценария фильтрации в Spark можно использовать четыре типа преобразования, у каждого из которых есть свои плюсы и минусы. Вот описание использования всех этих четырех преобразований для выполнения этого конкретного сценария фильтрации вместе с подробными примечаниями по аспектам надежности и эффективности каждого из них.

Фильтр. Преобразование фильтра (фильтрация записей набора данных по логическому выражению условия или логической возвращающей функции фильтра) в наборе данных может использоваться следующими способами:

1. Dataset<T> A` = A.filter(Column condition)
2. Dataset<T> A` = A.filter(FilterFunction<T> func)
3. Dataset<T> A` = A.filter(String conditionExpr)

Для сценария фильтрации, как описано ранее, можно использовать преобразование «Фильтр» на «А», которое принимает на входе «Функцию фильтра». «Функция фильтра» вызывается для каждой из записей, содержащихся в разделах соответствующего набора данных, и возвращает либо «истина», либо «ложь». В нашем сценарии фильтрации функция FilterFunction будет вызываться для каждой записи набора данных «A» и проверять, существует ли «empId» записи в транслируемом наборе empIds, «B» («B» поддерживается соответствующий HashTable).

Использование преобразования «Фильтр», как описано выше, довольно просто, надежно и эффективно независимо от размера набора данных «A». Это потому, что преобразование вызывается запись за записью. Кроме того, поскольку широковещательный набор empId поддерживается хэш-таблицей на исполнителе, поиск фильтрации в функции фильтрации для каждой записи остается эффективным.

Карта: преобразование карты (применяет функцию к каждой записи набора данных для возврата нулевого, того же или другого типа записи) в наборе данных используется следующим образом:

Dataset<U> A` = A.map(MapFunction<T,U> func, Encoder<U> encoder)

Для сценария фильтрации, как описано ранее, можно использовать преобразование «Карта» на «А», которое принимает на входе «Функцию карты». В нашем сценарии фильтрации «MapFunction» будет вызываться для каждой записи набора данных «A» и проверять, существует ли «empId» записи в транслируемом наборе empIds «B» (при поддержке соответствующей HashTable ). В случае, если запись существует, то же самое будет возвращено из MapFunction. В случае, если запись не существует, будет возвращено NULL. Кроме того, ввод кодировщика для MapFunction будет таким же, как и для набора данных «A».

Хотя семантика «MapFunction» аналогична «FilterFunction», использование преобразования «Map», как описано выше, для сценария фильтрации не так просто и элегантно по сравнению с подходом прямого преобразования «Filter». При преобразовании необходимо явно предусмотреть дополнительный ввод кодировщика. Кроме того, после вызова преобразования «Карта» выходные данные необходимо отфильтровать для значений NULL, поэтому подход «Карта» становится менее эффективным, чем подход «Фильтр». Однако надежность этого подхода аналогична подходу «Фильтр», поскольку он будет работать без проблем независимо от размера «А». Это потому, что преобразование «Карта» также вызывается запись за записью.

MapPartitions: преобразование Mappartitions (применяет функцию к каждому из разделов набора данных, возвращая либо нуль, либо итератор в новую коллекцию того же или другого типа записей) в наборе данных, используется следующим образом:

Dataset<U> A` = A.map(MapPartitionsFunction<T,U> func, Encoder<U> encoder)

Для сценария фильтрации, как описано ранее, можно также использовать преобразование «MapPartitions» для «A», которое принимает в качестве входных данных «MapPartitionsFunction». В нашем сценарии фильтрации «MapPartitionsFunction» будет вызываться для каждого раздела набора данных «A», выполняя итерацию по всем записям раздела и проверяя для каждой из записей, существует ли «empId» записи в транслируемый набор empId, 'B' (поддерживаемый соответствующей HashTable). В случае, если запись существует, она будет добавлена ​​в возвращаемую коллекцию, инициализированную в «MapPartitionsFunction». Наконец, итератор возвращаемой коллекции возвращается функцией MapPartitionsFunction.

По сравнению с подходом «Карта» и «Фильтр» подход «MapPartitions» обычно более эффективен, потому что он работает с разбиением на разделы, а не с записями. Однако, как и в случае с «Картой», при преобразовании необходимо явно указать ввод кодировщика. Кроме того, подход «MapPartitions» может стать крайне ненадежным в случае, если размер определенных разделов набора данных «A» превышает объем памяти, выделенной для выполнения каждой вычислительной задачи раздела. Это связано с тем, что более крупный раздел может привести к потенциально более крупной возвращаемой коллекции, что приведет к переполнению памяти.

Внутреннее соединение: преобразование внутреннего объединения применяется к двум входным наборам данных, A и B, следующим образом:

Dataset<Row> A` = A.join(Dataset<?> B, Column joinExprs)

Для сценария фильтрации, как описано ранее, можно также использовать преобразование «Внутреннее соединение» для «A», которое объединяет представление набора данных «B» в условии соединения (A.empId равно B.empId) и выбирает только поля «А» от каждой присоединенной записи.

Подход Внутреннее соединение возвращает набор данных общих объектов Строка, поэтому необходимо использовать кодировщик, чтобы преобразовать его обратно в набор данных типа записи A, чтобы соответствовать точной семантике фильтра. Однако, как и подход Фильтр, подход Внутреннее объединение эффективен и надежен. Эффективность проистекает из того факта, что, поскольку B является широковещательным, Spark выберет наиболее эффективный подход« Boradcast Hash Join для выполнения соединения. Кроме того, надежность исходит из того факта, что подход Внутреннее соединение будет применим к большим наборам данных А, как это было в случае с подходом Фильтр.

Учитывая все подходы, я бы выбрал подход «Фильтр» как самый безопасный вариант с точки зрения надежности и эффективности. Кроме того, следует отметить, что подход «Фильтр» также позволит мне выполнять антипоиск с такой же эффективностью и надежностью, которые не позволяет «Внутреннее объединение».

В случае обратной связи или вопросов по этой истории, пишите в разделе комментариев. Надеюсь, вам это пригодится. Вот ссылка на другие мои подробные истории об Apache Spark. Кроме того, получите копию моей недавно опубликованной книги о разделении на разделы Spark: « Руководство по разделению на разделы Spark: подробное объяснение разделов Spark »