Эквивалент Linux 'diff' в Apache Pig

Я хочу иметь возможность выполнять стандартное сравнение двух больших файлов. У меня есть кое-что, что будет работать, но это не так быстро, как diff в командной строке.

A = load 'A' as (line);
B = load 'B' as (line);
JOINED = join A by line full outer, B by line;
DIFF = FILTER JOINED by A::line is null or B::line is null;
DIFF2 = FOREACH DIFF GENERATE (A::line is null?B::line : A::line), (A::line is null?'REMOVED':'ADDED');
STORE DIFF2 into 'diff';

У кого-нибудь есть лучшие способы сделать это?


person Richard    schedule 06.05.2011    source источник
comment
Вы придумали что-нибудь получше для этого? Вы смотрели на функцию Pig DIFF()?   -  person Dolan Antenucci    schedule 07.12.2011


Ответы (1)


Я использую следующие подходы. (Мой подход JOIN очень похож, но этот метод не воспроизводит поведение diff с реплицированными строками). Поскольку это было задано некоторое время назад, возможно, вы использовали только один редуктор в качестве Pig >есть алгоритм для корректировки количества редукторов в 0.8?

  • Оба подхода, которые я использую, отличаются по производительности в пределах нескольких процентов друг от друга, но не обрабатывают дубликаты одинаково.
  • Подход JOIN сворачивает дубликаты (поэтому, если один файл имеет больше дубликатов, чем другой, этот подход не выведет дубликат)
  • Подход UNION работает подобно инструменту Unix diff(1) и возвращает правильное количество дополнительных дубликатов для правильного файла.
  • В отличие от инструмента Unix diff(1), порядок не важен (фактически подход JOIN выполняет sort -u <foo.txt> | diff, а UNION выполняет sort <foo> | diff)
  • Если у вас есть невероятное (~тысячи) количество повторяющихся строк, то все будет замедляться из-за соединений (если ваше использование позволяет, сначала выполните DISTINCT для необработанных данных)
  • Если ваши строки очень длинные (например, > 1 КБ), рекомендуется использовать DataFu MD5 UDF и разница только в хэшах, затем ПРИСОЕДИНЯЙТЕСЬ к исходным файлам, чтобы вернуть исходную строку перед выводом

Использование JOIN:

SET job.name 'Diff(1) Via Join'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS First: chararray;
b = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Second: chararray;

-- Combine Data
combined = JOIN a BY First FULL OUTER, b BY Second;

-- Output Data
SPLIT combined INTO first_raw IF Second IS NULL,
                    second_raw IF First IS NULL;
first_only = FOREACH first_raw GENERATE First;
second_only = FOREACH second_raw GENERATE Second;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

Использование UNION:

SET job.name 'Diff(1)'

-- Erase Outputs
rmf first_only
rmf second_only

-- Process Inputs
a_raw = LOAD 'a.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;
b_raw = LOAD 'b.csv.lzo' USING com.twitter.elephantbird.pig.load.LzoPigStorage('\n') AS Row: chararray;

a_tagged = FOREACH a_raw GENERATE Row, (int)1 AS File;
b_tagged = FOREACH b_raw GENERATE Row, (int)2 AS File;

-- Combine Data
combined = UNION a_tagged, b_tagged;
c_group = GROUP combined BY Row;

-- Find Unique Lines
%declare NULL_BAG 'TOBAG(((chararray)\'place_holder\',(int)0))'

counts = FOREACH c_group {
             firsts = FILTER combined BY File == 1;
             seconds = FILTER combined BY File == 2;
             GENERATE
                FLATTEN(
                        (COUNT(firsts) - COUNT(seconds) == (long)0 ? $NULL_BAG :
                            (COUNT(firsts) - COUNT(seconds) > 0 ?
                                TOP((int)(COUNT(firsts) - COUNT(seconds)), 0, firsts) :
                                TOP((int)(COUNT(seconds) - COUNT(firsts)), 0, seconds))
                        )
                ) AS (Row, File); };

-- Output Data
SPLIT counts INTO first_only_raw IF File == 1,
                  second_only_raw IF File == 2;
first_only = FOREACH first_only_raw GENERATE Row;
second_only = FOREACH second_only_raw GENERATE Row;
STORE first_only INTO 'first_only' USING PigStorage();
STORE second_only INTO 'second_only' USING PigStorage();

Производительность

  • Требуется примерно 10 минут для определения разницы более 200 ГБ (1 055 687 930 строк) с использованием сжатого ввода LZO с 18 узлами.
  • Каждый подход занимает только один цикл Map/Reduce.
  • Это приводит к примерно 1,8 ГБ дифференциации на узел в минуту (не очень высокая пропускная способность, но в моей системе кажется, что diff(1) работает только в памяти, в то время как Hadoop использует потоковые диски.
person Clay B.    schedule 10.01.2013