Используя Spring Integration Java DSL, я создал поток, в котором я обрабатываю файлы синхронно с FileSplitter
. Я смог использовать флаг setDeleteFiles
в AbstractFilePayloadTransformer
для удаления файла после преобразования каждой строки в File
в Message
для последующей обработки, например так:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
Это работает нормально, но медленно. Поэтому я пытаюсь добавить ExecutorChannel
после .split
выше, вот так:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
Но тогда вышеупомянутый флаг удаления не позволяет потоку успешно завершить удаление файлов до того, как они будут полностью прочитаны.
Если я удалю этот флаг, у меня есть возможность исчерпать локальную файловую систему, в которой файлы были синхронизированы с S3.
Что я мог бы предложить выше, чтобы а) полностью обработать каждый файл и б) удалить файл из локальной файловой системы после завершения? Другими словами, есть ли способ точно узнать, когда файл полностью обработан ( когда его строки обрабатывались асинхронно через потоки в пуле)?
Если вам интересно, вот мой пример FileToInputStreamTransformer
:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
ОБНОВЛЕНИЕ
Так как же что-то в нисходящем потоке знает, о чем просить?
Между прочим, если я правильно следую вашему совету, когда я обновляю .split
с new FileSplitter(true, true)
выше, я получаю
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)