Я получаю много файлов gzip (*.gz) от других каждый день, и прежде чем помещать их в HDFS и анализировать, мне нужно проверить целостность всех файлов (поврежденные файлы будут удалены), если я использую gzip -t file_name для проверки на локальном компьютере, это работает, но весь процесс слишком медленный, потому что объем файла очень велик, а большинство файлов достаточно велики, чтобы сделать локальную проверку трудоемкой работой.
Поэтому я решил использовать задание Hadoop для выполнения параллельной проверки, каждый файл будет проверяться в преобразователе, а поврежденный путь к файлу будет выводиться в файл, вот мои коды:
в настройке задания Hadoop:
Job job = new Job(getConf());
job.setJarByClass(HdfsFileValidateJob.class);
job.setMapperClass(HdfsFileValidateMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(JustBytesInputFormat.class);
в картографе:
public class HdfsFileValidateMapper extends Mapper<JustBytesWritable, NullWritable, Text, NullWritable> {
private static final Logger LOG = LoggerFactory.getLogger(HdfsFileValidateJob.class);
private ByteArrayOutputStream bos;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
/* specify a split size(=HDFS block size here) for the ByteArrayOutputStream, which prevents frequently allocating
* memory for it when writing data in [map] method */
InputSplit inputSplit = context.getInputSplit();
bos = new ByteArrayOutputStream((int) ((FileSplit) inputSplit).getLength());
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
bos.flush();
byte[] mergedArray = bos.toByteArray(); // the byte array which stores the data of the whole file
if (!testUnGZip(mergedArray)) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
bos.close();
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
bos.write(key.getBytes());
}
/**
* Test whether we can un-gzip a piece of data.
*
* @param data The data to be un-gzipped.
* @return true for successfully un-gzipped the data, false otherwise.
*/
private static boolean testUnGZip(byte[] data) {
int numBytes2Read = 0;
ByteArrayInputStream bis = null;
GZIPInputStream gzip = null;
try {
bis = new ByteArrayInputStream(data);
gzip = new GZIPInputStream(bis);
byte[] buf = new byte[1024];
int num;
while ((num = gzip.read(buf, 0, buf.length)) != -1) {
numBytes2Read += num;
if (numBytes2Read % (1024 * 1024) == 0) {
LOG.info(String.format("Number of bytes read: %d", numBytes2Read));
}
}
} catch (Exception e) {
return false;
} finally {
if (gzip != null) {
try {
gzip.close();
} catch (IOException e) {
LOG.error("Error while closing GZIPInputStream");
}
}
if (bis != null) {
try {
bis.close();
} catch (IOException e) {
LOG.error("Error while closing ByteArrayInputStream");
}
}
}
return true;
}
}
В котором я использую два класса с именами JustBytesInputFormat и JustBytesWritable, можно найти здесь: https://issues.apache.org/jira/secure/attachment/12570327/justbytes.jar
Обычно это решение работает нормально, но когда один файл gzip достаточно велик (например, 1,5 ГБ), задание Hadoop завершится ошибкой из-за проблемы с пространством кучи Java, и причина очевидна: для каждого файла я сначала собираю все данные. в буфер памяти и, наконец, выполнить однократную проверку, поэтому размер файла не может быть слишком большим.
Поэтому я изменил часть своего кода на:
private boolean testUnGzipFail = false;
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String filePath = ((FileSplit) inputSplit).getPath().toUri().getPath(); // e.g. "/user/hadoop/abc.txt"
if (testUnGzipFail) { // broken file
context.write(new Text(filePath), NullWritable.get());
}
}
@Override
public void map(JustBytesWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
if (!testUnGZip(key.getBytes())) {
testUnGzipFail = true;
}
}
Эта версия исправляет проблему сбоя задания Hadoop, но она вообще не работает! В моем тесте E2E совершенно нормальный файл gzip (размер: 1,5 ГБ) будет рассматриваться как поврежденный файл!
Итак, вот моя проблема: как я могу правильно выполнить проверку и избежать проблемы чтения всего содержимого одного файла в память?
Любая идея будет оценена, спасибо заранее.