Создание стратегии выпуска Spring интеграции с использованием Spring DSL

Я новичок в интеграции Spring. Я пытаюсь разделить сообщение из файла с помощью разделителя файлов, а затем использовать .aggregate () для создания одного сообщения и отправки в выходной канал. У меня есть маркеры как истинные, и, следовательно, теперь по умолчанию для параметра apply-sequence установлено значение false. Я установил correlationId на константу "1" с помощью enrichHeaders. У меня проблемы с настройкой стратегии повторного выпуска, так как я не удерживаю конец последовательности. Вот как выглядит мой код.

    IntegrationFlows
                .from(s -> s.file(new File(fileDir))
                                .filter(getFileFilter(fileName)),
                        e -> e.poller(poller))

                .split(Files.splitter(true, true)
                                .charset(StandardCharsets.US_ASCII),
                        e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));

 IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();

@Bean
    public IntegrationFlow itemExcludes() {
        return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
                .aggregate(aggregator -> aggregator
                        .outputProcessor(group -> group.getMessages()
                        .stream()
                        .map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
                        .collect(Collectors.joining(","))))
                .transform(Transformers.toJson())
                .channel(customSource.itemExclude());
    }

    @Bean
    public IntegrationFlow itemExcludeMarkers() {
        return flow -> flow
                .log(LoggingHandler.Level.INFO)
                .<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
                .<FileHandler>handle(new FileHandler(configProps))
                .channel(NULL_CHANNEL);
    }

Любая помощь приветствуется.


person Goni_code_love    schedule 15.09.2017    source источник


Ответы (2)


Используйте настраиваемую стратегию выпуска, которая ищет маркер END в последнем сообщении, и, возможно, настраиваемый процессор вывода, который удаляет маркеры из коллекции.

person Gary Russell    schedule 15.09.2017

Я бы переместил ваш заголовок более богатым на correlationId перед splitter и сделал бы это так:

 .enrichHeaders(h -> h
        .headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, 
                   m -> m.getHeaders().getId())) 

Константа correlationId совершенно не годится в многопоточной среде: разные потоки разбивают разные файлы и отправляют разные строки в один и тот же агрегатор. Итак, с "1" в качестве ключа корреляции у вас всегда будет одна группа, которую нужно агрегировать и выпускать. Поведение по умолчанию последовательность - заполнить исходное сообщение id в correlationId. Поскольку вы не собираетесь полагаться на applySequence из FileSplitter, я предлагаю это простое решение для имитации этого поведения.

Как указал Гэри в своем ответе, вам нужно подумать о пользовательском ReleaseStrategy и также отправить FileSplitter.FileMarker агрегатору. FileSplitter.FileMarker.END имеет свойство lineCount, которое можно сравнить с MessageGroup.size, чтобы решить, что мы готовы выпустить группу. MessageGroupProcessor действительно должен фильтровать FileSplitter.FileMarker сообщения во время построения результата для вывода.

person Artem Bilan    schedule 15.09.2017
comment
Вам действительно не нужно сравнивать размер; пока он выполняется в одном потоке, маркер END будет последним. - person Gary Russell; 15.09.2017