Интеграция с Spring: потребление памяти при опросе файлов

Я использую InboundChannelAdapter с Poller для обработки файлов каждые 30 секунд. Файлы невелики, но я понимаю, что потребление памяти продолжает расти, даже когда файлов нет.

@Bean
@InboundChannelAdapter(value = "flowFileInChannel" ,poller = @Poller(fixedDelay ="30000", maxMessagesPerPoll = "1"))
public MessageSource<File> flowInboundFileAdapter(@Value("${integration.path}") File directory) {
    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(directory);
    source.setFilter(flowPathFileFilter);
    source.setUseWatchService(true);
    source.setScanEachPoll(true);
    source.setAutoCreateDirectory(false);
    return source;
}

введите здесь описание изображения

Есть ли внутренняя очередь, которая не очищается после каждого опроса? Как настроить, чтобы память не съедала.

После более глубокого изучения кажется, что приведенные ниже Spring IntegrationFlows, которые обрабатывают данные из InboundChannelDapter, задерживают память после каждого опроса файла. После того, как я закомментировал среднюю часть, потребление памяти кажется стабильным (вместо увеличения потребления). Теперь мне интересно, как заставить Spring IntegrationFlows очищать эти сообщения и заголовки после того, как они передаются по разным каналам (т.е. после последнего канала ниже)

public IntegrationFlow incomingLocateFlow(){
        return IntegrationFlows.from(locateIncomingChannel())

//                .split("locateItemSplitter","split")
//                .transform(locateItemEnrichmentTransformer)
//                .transform(locateRequestTransformer)
//                .aggregate(new Consumer<AggregatorSpec>() {                        // 32
//
//                    @Override
//                    public void accept(AggregatorSpec aggregatorSpec) {
//                        aggregatorSpec.processor(locateRequestProcessor, null);                // 33
//                    }
//
//                }, null)
//                .transform(locateIncomingResultTransformer)
//                .transform(locateExceptionReportWritingHandler)
                .channel(locateIncomingCompleteChannel())
                .get();
    }

person enfany    schedule 11.07.2017    source источник


Ответы (1)


Действительно есть AcceptOnceFileListFilter с кодом вроде:

private final Queue<F> seen;

private final Set<F> seenSet = new HashSet<F>();

При каждом опросе эти внутренние коллекции пополняются новыми файлами.

Для этой цели вы можете использовать FileSystemPersistentAcceptOnceFileListFilter с постоянной реализацией MetadataStore, чтобы избежать памяти потребление.

Также рассмотрите возможность использования какого-либо инструмента для анализа содержимого памяти. У вас может быть что-то еще ниже по течению на flowFileInChannel.

ОБНОВЛЕНИЕ

Поскольку вы используете .aggregate(), это определенно точка, в которой память потребляется по умолчанию. Это потому, что есть SimpleMessageStore для хранения сообщений для группировки. Плюс есть опция expireGroupsUponCompletion(boolean), которая по умолчанию false. Поэтому даже после успешного выпуска некоторая информация все еще находится в файле MessageStore. Вот как ваша память потребляется немного время от времени.

Эта опция по умолчанию false, чтобы позволить иметь логику, когда мы отбрасываем запоздалое сообщение для завершенной группы. Когда это true, вы можете сформировать новую группу для того же correlationKey.

Дополнительную информацию об агрегаторе см. в Справочном руководстве< /а>.

person Artem Bilan    schedule 11.07.2017
comment
Спасибо Артем, после некоторых расследований первопричиной увеличения потребления памяти является не часть опроса файлов, а что-то после этой части. Я обновил свои исходные вопросы с дополнительной информацией. - person enfany; 12.07.2017
comment
Смотрите ОБНОВЛЕНИЕ в моем ответе. - person Artem Bilan; 12.07.2017
comment
Спасибо, Артерм, я внес изменения, но это не очень помогает. - person enfany; 17.07.2017
comment
Хм... Можете ли вы точно сказать, какой компонент вызывает утечку памяти? Как насчет того, чтобы поиграть с профилировщиком, таким как Visualvm или YourKit, чтобы определить корень проблемы? - person Artem Bilan; 17.07.2017