Spring Integration Java DSL flow Splitter/Aggregator удалить файл после обработки всех строк

Используя Spring Integration Java DSL, я создал поток, в котором я обрабатываю файлы синхронно с FileSplitter. Я смог использовать флаг setDeleteFiles в AbstractFilePayloadTransformer для удаления файла после преобразования каждой строки в File в Message для последующей обработки, например так:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // do not exhaust filesystem w/ files downloaded from S3
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .channel(StatsUtil.createRunStatsChannel(runStatsRepository))
        .transform(transformer)
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

Это работает нормально, но медленно. Поэтому я пытаюсь добавить ExecutorChannel после .split выше, вот так:

.channel(c -> c.executor(Executors.newFixedThreadPool(10)))

Но тогда вышеупомянутый флаг удаления не позволяет потоку успешно завершить удаление файлов до того, как они будут полностью прочитаны.

Если я удалю этот флаг, у меня есть возможность исчерпать локальную файловую систему, в которой файлы были синхронизированы с S3.

Что я мог бы предложить выше, чтобы а) полностью обработать каждый файл и б) удалить файл из локальной файловой системы после завершения? Другими словами, есть ли способ точно узнать, когда файл полностью обработан ( когда его строки обрабатывались асинхронно через потоки в пуле)?

Если вам интересно, вот мой пример FileToInputStreamTransformer:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

    @Override
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
    }
}

ОБНОВЛЕНИЕ

Так как же что-то в нисходящем потоке знает, о чем просить?

Между прочим, если я правильно следую вашему совету, когда я обновляю .split с new FileSplitter(true, true) выше, я получаю

2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)

person Chris Phillipson    schedule 20.10.2015    source источник


Ответы (2)


FileSplitter имеет markers опцию именно для этого:

Установите значение true, чтобы выдавать сообщения маркера начала/конца файла до и после данных файла. Маркеры — это сообщения с полезными данными FileSplitter.FileMarker (со значениями START и END в свойстве mark). Маркеры могут использоваться при последовательной обработке файлов в нисходящем потоке, где некоторые строки фильтруются. Они позволяют последующей обработке узнать, когда файл был полностью обработан. Маркер END включает количество строк. По умолчанию: false. Когда true, apply-sequence по умолчанию равно false.

Вы можете использовать его в нисходящем потоке, чтобы определить, можно ли удалить файл уже или еще нет.

person Artem Bilan    schedule 20.10.2015
comment
Попробуйте использовать PayloadTypeRouter или publishSubscribeChannel с aggregator с одной стороны и filter перед transform() с другой. - person Artem Bilan; 21.10.2015

Спасибо Артем.

Мне удалось решить эту проблему, но, возможно, в более тяжелой форме.

Введение ExecutorChannel вызвало множество корректировок реализации, в том числе: отключение флага setDeleteFiles на AbtractFilePayloadTransformer, обновление JPA @Entity, RunStats и репозитория для этого, чтобы зафиксировать статус обработки файла, а также статус обработки для всего запуска. В совокупности обновления состояния обработки позволяют потоку узнать, когда удалять файлы из локальной файловой системы (т. е. когда они полностью обработаны) и возвращать состояние в конечной точке /stats/{run}, чтобы пользователь мог знать, когда выполнение завершено.

Вот фрагменты из моей реализации (если кому-то интересно)...

class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
    return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}

public class RunStatsHandler extends AbstractMessageHandler {

private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

private final RunStatsRepository runStatsRepository;

public RunStatsHandler(RunStatsRepository runStatsRepository) {
    this.runStatsRepository = runStatsRepository;
}

// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
    String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
    if (runStats != null) {
        File compressedFile = (File) message.getPayload();
        String compressedFileName = compressedFile.getCanonicalPath();
        LongAdder lineCount = new LongAdder();
        // Streams and Scanner implement java.lang.AutoCloseable
        InputStream fs = new FileInputStream(compressedFile);
        InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
        try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
            while (sc.hasNextLine()) {
                sc.nextLine();
                lineCount.increment();
            }
            // note that Scanner suppresses exceptions
            if (sc.ioException() != null) {
                log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
                        "exception", sc.ioException().getMessage()));
                throw sc.ioException();
            }
            runStats.addFile(compressedFileName, token, lineCount.longValue());
            runStatsRepository.updateRunStats(runStats);
            log.info("file.lineCount",
                    ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
        }
    }
}

}

Обновленный поток

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
        .channel(runStatsChannel())
        .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
        .transform(new FileToInputStreamTransformer())
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

@Bean
public MessageChannel runStatsChannel() {
    DirectChannel wiretapChannel = new DirectChannel();
    wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
    DirectChannel loggingChannel = new DirectChannel();
    loggingChannel.addInterceptor(new WireTap(wiretapChannel));
    return loggingChannel;
}

К сожалению, я не могу поделиться реализациями RunStats и репо.

person Chris Phillipson    schedule 21.10.2015