Поток данных Google: как анализировать большой файл с допустимым массивом JSON из FileIO.ReadableFile


В моем конвейере преобразование FileIO.readMatches() считывает большой файл JSON (около 300-400 МБ) с допустимым массивом JSON и возвращает объект FileIO.ReadableFile для следующего преобразования. Моя задача — прочитать каждый объект JSON из этого массива JSON, добавить новые свойства и вывести в следующее преобразование.

На данный момент мой код для разбора файла JSON выглядит так:

        // file is a FileIO.ReadableFile object 
        InputStream bis = new ByteArrayInputStream(file.readFullyAsBytes());
        // Im using gson library to parse JSON
        JsonReader reader = new JsonReader(new InputStreamReader(bis, "UTF-8"));
        JsonParser jsonParser = new JsonParser();
        reader.beginArray();
        while (reader.hasNext()) {
            JsonObject jsonObject = jsonParser.parse(reader).getAsJsonObject();
            jsonObject.addProperty("Somename", "Somedata");
            // processContext is a ProcessContext object
            processContext.output(jsonObject.toString());
        }
        reader.close();

В этом случае все содержимое файла будет в моей памяти, что дает возможность получить java.lang.OutOfMemoryError. Я ищу решение для чтения по одному всех объектов JSON, не сохраняя весь файл в памяти. Возможным решением является использование метода open() из объекта FileIO.ReadableFile, который возвращает канал ReadableByteChannel, но я не уверен, как использовать этот канал для чтения конкретного объекта JSON из этого канала.

Обновленное решение Это мое обновленное решение, которое считывает файл построчно.

    ReadableByteChannel readableByteChannel = null;
    InputStream inputStream = null;
    BufferedReader bufferedReader = null;
    try {
        // file is a FileIO.ReadableFile 
        readableByteChannel = file.open();
        inputStream = Channels.newInputStream(readableByteChannel);
        bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            if (line.length() > 1) {
                // my final output should contain both filename and line
                processContext.output(fileName + file);
            }
        }
    } catch (IOException ex) {
        logger.error("Exception during reading the file: {}", ex);
    } finally {
        IOUtils.closeQuietly(bufferedReader);
        IOUtils.closeQuietly(inputStream);
    }

Я вижу, что это решение не работает с потоком данных, работающим на машине n1-standard-1, выдает исключение java.lang.OutOfMemoryError: GC overhead limit exceeded и правильно работает на машине n1-standard-2.


person turlife    schedule 05.02.2018    source источник
comment
Предполагается ли, что весь файл JSON будет проанализирован за один шаг преобразования? Вы можете использовать класс TextIO для чтения файла JSON и анализа каждого массива JSON по отдельности вместо того, чтобы хранить весь файл в памяти. Можете ли вы предоставить больше информации о том, как ваш конвейер определен в вашем примере кода?   -  person Andrew Nguonly    schedule 05.02.2018
comment
К сожалению, я не могу использовать TextIO в своем решении, потому что мне нужно имя файла и содержимое файла на одном этапе преобразования. Я задал вопрос об этом здесь. В моем текущем посте я не упомянул эту часть, чтобы упростить свой вопрос. Моя идея состоит в том, чтобы получить FileIO.ReadableFile в качестве входных данных для моего шага преобразования, извлечь имя файла и содержимое файла из этого входного объекта, проанализировать каждую строку из файла и вывести этот файл с дополнительной информацией из имени файла для следующего шага преобразования.   -  person turlife    schedule 06.02.2018


Ответы (1)


ReadableByteChannel — это API-интерфейс java NIO, представленный в Java 7. Java предоставляет способ преобразовать его в InputStream: InputStream bis = Channels.newInputStream(file.open()); — я считаю, что это единственное изменение, которое вам нужно сделать.

person jkff    schedule 05.02.2018
comment
Привет jkff, спасибо за ваш ответ. На самом деле, я уже сделал это, как вы сказали: я читаю свой входной поток построчно и вывожу в следующее преобразование. Я запускаю свое приложение на машине n1-standard-1 с памятью кучи около 500 МБ. Но похоже этого недостаточно, потому что я вижу java.lang.OutOfMemoryError: GC overhead limit exceeded исключение из строки processContext.output(message). Я ожидал, что младшая машина n1-standard-1 должна быть достаточной для такого преобразования. Каково твое мнение? Это мой код или машина слишком низкая? - person turlife; 05.02.2018
comment
В опубликованном коде используется новый ByteArrayInputStream(file.readFullyAsBytes()), который считывает весь файл в памяти. Не могли бы вы опубликовать обновленный код? - person jkff; 05.02.2018