Отправка точных двоичных последовательностей с использованием потоковой передачи Hadoop

Есть наборы бинарных файлов, которые мне нужно разделить (согласно некоторой логике) и раздать мапперам. Для этого я использую потоковую передачу Hadoop. Основная проблема заключается в том, чтобы отправить по сети точные двоичные фрагменты, не изменяя их. Оказалось, что отправлять необработанные байты не так уж и просто.

Чтобы лучше проиллюстрировать проблему, я написал очень простой расширенный класс RecordReader, который должен считывать несколько байтов из разделения и отправлять их. Двоичные данные могут содержать что угодно (включая символ новой строки). Вот что может прочитать next():

public class MyRecordReader implements
        RecordReader<BytesWritable, BytesWritable> {
    ...
    public boolean next(BytesWritable key, BytesWritable ignore)
            throws IOException {
        ...

        byte[] result = new byte[8];
        for (int i = 0; i < result.length; ++i)
            result[i] = (byte)(i+1);
        result[3] = (byte)'\n';
        result[4] = (byte)'\n';

        key.set(result, 0, result.length);
        return true;
    }
}

В этом сценарии каждый вызов функции next() должен записывать в стандартный ввод следующую последовательность байтов: 01 02 03 0a 0a 06 07 08. Если я использую типизированные байты (Hadoop-1722), то последовательность должна быть всего пять байтов с префиксом, Первый байт соответствует типу последовательности (0 для байтов), остальные четыре байта — размеру. Таким образом, последовательность должна выглядеть примерно так: 00 00 00 00 08 01 02 03 0a 0a 06 07 08.

Я проверил его на /bin/cat, чтобы проверить результаты, команда следующая:

hadoop jar <streaming jar location>
  -libjars <my input format jar>
  -D stream.map.input=typedbytes
  -mapper /bin/cat
  -inputformat my.input.Format

Используя hexdump для просмотра входящих ключей, я получил следующее: 00 00 00 00 08 01 02 03 09 0a 09 0a 06 07 08. Как вы можете видеть, каждый 0a (новая строка) имеет префикс 09 (табуляция), тем не менее типизированные байты дают (ранее) правильную информацию о типе и размере последовательности байтов.

Это создает серьезную проблему при написании картографов, использующих другие языки, потому что байты изменяются по пути.

Кажется, нет никакой гарантии, что байты будут отправлены именно такими, какие они есть, если только я что-то не упустил?


person Y.H.    schedule 08.10.2013    source источник


Ответы (1)


Я нашел решение этой проблемы благодаря очень полезному подсказка в списке рассылки пользователей Hadoop.

Короче говоря, нам нужно переопределить, как Hadoop IO записывает/читает данные в/из стандартного потока. Сделать это:

  1. Расширьте InputWriter, OutputReader, а также предоставьте свои собственные InputFormat и OutputFormat, чтобы вы полностью контролировали, как байты записываются в поток и считываются из него.
  2. Расширьте класс IdentifierResolver, чтобы указать Hadoop использовать ваши собственные InputWriter и OutputReader.

Используйте свои IdentifierResolver, InputFormat и OuputFormat следующим образом:

hadoop jar <streaming jar location>
-D stream.io.identifier.resolver.class=my.own.CustomIdentifierResolver
-libjars <my input format jar>
-mapper /bin/cat
-inputformat my.own.CustomInputFormat
-outputformat my.own.CustomOutputFormat
<other options ...>

Патч, представленный в функции (не объединенной) MAPREDUCE-5018, является отличным источником. о том, как это сделать, и может быть настроен в соответствии со своими потребностями.

person Y.H.    schedule 10.10.2013