Как с помощью задания Hadoop проверить целостность большого файла gzip (.gz)?

Я получаю много файлов gzip (*.gz) от других каждый день, и прежде чем помещать их в HDFS и анализировать, мне нужно проверить целостность всех файлов (поврежденные файлы будут удалены), если я использую gzip -t file_name для проверки на локальном компьютере, это работает, но весь процесс слишком медленный, потому что объем файла очень велик, а большинство файлов достаточно велики, чтобы сделать локальную проверку трудоемкой работой.

Поэтому я решил использовать задание Hadoop для выполнения параллельной проверки, каждый файл будет проверяться в преобразователе, а поврежденный путь к файлу будет выводиться в файл, вот мои коды:

в настройке задания Hadoop:

Job job = new Job(getConf());
job.setJarByClass(HdfsFileValidateJob.class);
job.setMapperClass(HdfsFileValidateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(JustBytesInputFormat.class);

в картографе:

public class HdfsFileValidateMapper extends Mapper<JustBytesWritable, NullWritable, Text, NullWritable> {
  private static final Logger LOG = LoggerFactory.getLogger(HdfsFileValidateJob.class);

  private ByteArrayOutputStream bos;

  @Override
  protected void setup(Context context) throws IOException, InterruptedException {
    /* specify a split size(=HDFS block size here) for the ByteArrayOutputStream, which prevents frequently allocating
     * memory for it when writing data in [map] method */
    InputSplit inputSplit = context.getInputSplit();
    bos = new ByteArrayOutputStream((int) ((FileSplit) inputSplit).getLength());
  }

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    InputSplit inputSplit = context.getInputSplit();
    String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();   // e.g. "/user/hadoop/abc.txt"

    bos.flush();
    byte[] mergedArray = bos.toByteArray();   // the byte array which stores the data of the whole file
    if (!testUnGZip(mergedArray)) {   // broken file
      context.write(new Text(filePath), NullWritable.get());
    }
    bos.close();
  }

  @Override
  public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
    bos.write(key.getBytes());
  }

  /**
   * Test whether we can un-gzip a piece of data.
   *
   * @param data The data to be un-gzipped.
   * @return true for successfully un-gzipped the data, false otherwise.
   */
  private static boolean testUnGZip(byte[] data) {
    int numBytes2Read = 0;
    ByteArrayInputStream bis = null;
    GZIPInputStream gzip = null;
    try {
      bis = new ByteArrayInputStream(data);
      gzip = new GZIPInputStream(bis);
      byte[] buf = new byte[1024];
      int num;
      while ((num = gzip.read(buf, 0, buf.length)) != -1) {
        numBytes2Read += num;
        if (numBytes2Read % (1024 * 1024) == 0) {
          LOG.info(String.format("Number of bytes read: %d", numBytes2Read));
        }
      }
    } catch (Exception e) {
      return false;
    } finally {
      if (gzip != null) {
        try {
          gzip.close();
        } catch (IOException e) {
          LOG.error("Error while closing GZIPInputStream");
        }
      }
      if (bis != null) {
        try {
          bis.close();
        } catch (IOException e) {
          LOG.error("Error while closing ByteArrayInputStream");
        }
      }
    }
    return true;
  }
}

В котором я использую два класса с именами JustBytesInputFormat и JustBytesWritable, можно найти здесь: https://issues.apache.org/jira/secure/attachment/12570327/justbytes.jar

Обычно это решение работает нормально, но когда один файл gzip достаточно велик (например, 1,5 ГБ), задание Hadoop завершится ошибкой из-за проблемы с пространством кучи Java, и причина очевидна: для каждого файла я сначала собираю все данные. в буфер памяти и, наконец, выполнить однократную проверку, поэтому размер файла не может быть слишком большим.

Поэтому я изменил часть своего кода на:

  private boolean testUnGzipFail = false;

  @Override
  protected void cleanup(Context context) throws IOException, InterruptedException {
    InputSplit inputSplit = context.getInputSplit();
    String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath();   // e.g. "/user/hadoop/abc.txt"

    if (testUnGzipFail) {   // broken file
      context.write(new Text(filePath), NullWritable.get());
    }
  }

  @Override
  public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
    if (!testUnGZip(key.getBytes())) {
      testUnGzipFail = true;
    }
  }

Эта версия исправляет проблему сбоя задания Hadoop, но она вообще не работает! В моем тесте E2E совершенно нормальный файл gzip (размер: 1,5 ГБ) будет рассматриваться как поврежденный файл!

Итак, вот моя проблема: как я могу правильно выполнить проверку и избежать проблемы чтения всего содержимого одного файла в память?

Любая идея будет оценена, спасибо заранее.


person celt    schedule 04.12.2014    source источник


Ответы (1)


Моим первым решением было бы просто вызвать gzip -t параллельно; gzip, вероятно, быстрее, чем Java, и когда файлы большие, дополнительные накладные расходы на создание процесса должны стать незначительными.

Ваше решение очень медленное. Прежде всего, вы загружаете много-много гигабайт данных в оперативную память, когда вам нужно всего несколько КБ на файл. Вместо JustBytesInputFormat вы должны передавать данные. Попробуйте найти способ передать InputStream в testUnGZip() вместо всего содержимого файла.

Если файл существует как реальный файл на жестком диске, попробуйте использовать NIO API для чтения из него; это позволило бы отображать файл в память, что делает чтение еще быстрее.

person Aaron Digulla    schedule 04.12.2014
comment
Вызов gzip -t параллельно кажется хорошим способом, но для этого требуется локальный путь к файлу, а в задании Hadoop файл в маппере, который вы читаете, может не находиться в локальной файловой системе, но может быть прочитан с удаленной машины, поэтому это невозможно. - person celt; 04.12.2014
comment
В Hadoop должен быть API, который дает вам InputStream (или другой способ чтения файла по частям). Используйте это. Кроме того, если файл находится на удаленной машине, вам обязательно нужно запускать задание на том же ЦП — копирование файла по сети чрезвычайно затратно. - person Aaron Digulla; 04.12.2014
comment
Я также предлагаю вам перенести шаг проверки перед загрузкой поврежденных данных в Hadoop. Hadoop упрощает распределение заданий в кластере, но не удешевляет его — на самом деле Hadoop намного медленнее, чем локальная обработка данных. Но поскольку он может выполнять множество заданий одновременно, вы можете сбалансировать две цели. Если можете, проверяйте файлы во время их загрузки, чтобы не тратить место на диске даже на поврежденные файлы. - person Aaron Digulla; 04.12.2014
comment
Проверка файла во много раз быстрее загрузки, поэтому проверка больше не является узким местом. - person Aaron Digulla; 04.12.2014
comment
Я загружаю файлы с помощью GNU Parallel, так что их можно быстро поместить в HDFS. Вместо этого локальная проверка gzip -t выполняется очень медленно. В моей среде способ использования GUN Parallel для размещения файлов + использование задания Hadoop для проверки экономит не менее 5 часов в день. Мы просто потому, что не могли мириться с неэффективностью локальных рабочих мест, поэтому обратились к пути Hadoop. - person celt; 04.12.2014
comment
В самом начале я просто сделал то же, что и вы, а именно: локальную проверку, удаление поврежденных файлов перед их помещением в HDFS и отсутствие задания Hadoop. Но этот способ действительно занимает слишком много времени, поэтому у меня не было другого выбора, кроме как изменить его на текущий статус. - person celt; 04.12.2014
comment
Хорошо. Как насчет потокового API Hadoop. Вы проверили это? - person Aaron Digulla; 04.12.2014