Интеграция Spring: запускать опрос JPA только после обработки всех результатов последнего опроса

У меня есть следующий поток, который я хотел бы реализовать с помощью Spring Integration Java DSL:

  1. Опрашивать таблицу в базе данных каждые 2 часа, которая возвращает идентификатор документов, которые необходимо обработать.
  2. Для каждого идентификатора обработать документ через HTTP-шлюз.
  3. Сохранить ответ в базе данных

У меня есть рабочий код Java, который выполняет именно эти шаги. Дополнительное требование, с которым я борюсь, заключается в том, что опрос для следующего раунда документов не должен происходить до тех пор, пока все документы из последнего опроса не будут обработаны и сохранены в базе данных.

Есть ли какой-нибудь шаблон в Spring Integration, который я мог бы использовать для этого дополнительного требования?

Вот упрощенный код - он станет более сложным, и я разделю обработку документов (исходящий и постоянный HTTP) на отдельные классы / потоки:

return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
                .entityClass(ProcessingMetadata.class)
                .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                        "where p.status = com.test.ProcessingStatus.PROCESSED")
                .maxResults(1)
                .expectSingleResult(true),
        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
        .handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
                .entityClass(DocumentHeader.class)
                .jpaQuery("from DocumentHeader d where d.modified > :modified")
                .parameterExpression("modified", "payload"))
        .handle(Http.outboundGateway(uri)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class))
        .handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
                        .entityClass(ProcessingMetadata.class)
                        .persistMode(PersistMode.PERSIST),
                e -> e.transactional(true))
        .get();

ОБНОВЛЕНИЕ

Следуя предложению Артема, я пытаюсь реализовать его с помощью SimpleActiveIdleMessageSourceAdvice

class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {

    public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return false;
    }
}

Если я правильно понимаю, приведенный выше код перестанет опрашивать. Теперь я не знаю, как прикрепить этот совет к Jpa.inboundAdapter ... Кажется, у него нет подходящего метода (ни Advice, ни Spec Handler). Я упускаю здесь что-то очевидное? Я пробовал прикрепить совет к Jpa.retrievingGateway, но это никак не меняет поток.

ОБНОВЛЕНИЕ2

Проверьте этот вопрос, чтобы получить полное решение: Spring Integration: как выполнить модульное тестирование совет


person Blink    schedule 22.04.2020    source источник


Ответы (1)


Сегодня я ответил на аналогичный вопрос: Как опросить из очереди по одному сообщению за раз после завершения нисходящего потока в Spring Integration.

У вас также может быть уловка на уровне базы данных, не позволяющая видеть новые записи в таблице, в то время как другие заблокированы. Или у вас может быть несколько UPDATE в конце потока, в то время как ваш SELECT не увидит соответствующие записи, пока они не будут соответственно обновлены.

Но в любом случае любой из тех подходов, которые я предлагаю по этому вопросу, должен быть применен и здесь.

Также вы действительно можете полагаться на SimpleActiveIdleMessageSourceAdvice, поскольку ваше решение уже основано на реализации MessageSource.

ОБНОВЛЕНИЕ

Для вашего варианта использования, вероятно, было бы лучше расширить этот SimpleActiveIdleMessageSourceAdvice и переопределить его beforeReceive(), чтобы проверить какое-то состояние, можете ли вы читать больше данных или нет. idlePollPeriod и activePollPeriod могут быть одним и тем же значением: не похоже, что имеет смысл менять его между ними, поскольку вы переходите в состояние ожидания сразу после чтения следующего набора данных.

Для проверки состояния это действительно может быть простой AtomicBoolean bean-компонент, который вы должны изменить после обработки текущего набора документов. Это может быть что-то после агрегатора или что-то еще, что вы можете использовать в своем решении.

ОБНОВЛЕНИЕ 2

Чтобы использовать WaitUntilCompleted для своего Jpa.inboundAdapter, у вас должна быть такая конфигурация:

IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
            .entityClass(ProcessingMetadata.class)
            .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                    "where p.status = com.test.ProcessingStatus.PROCESSED")
            .maxResults(1)
            .expectSingleResult(true),
    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))

Обратите внимание на .advice(waitUntilCompleted()), который является частью конфигурации плеера и указывает на ваш компонент advice.

person Artem Bilan    schedule 22.04.2020
comment
Большое спасибо за быстрый ответ. Я пытаюсь понять, как использовать SimpleActiveIdleMessageSourceAdvice, но я потерялся ... Не могли бы вы рассказать немного, как именно я могу использовать этот совет? Кроме того, как я узнаю, что все сообщения были обработаны? На каком-то этапе вышеупомянутого потока я получу список элементов, которые мне нужно будет разделить. Нужно ли мне каким-то образом агрегировать его и установить для атомарного логического значения значение true или мне нужно сохранить счетчик списка? - person Blink; 23.04.2020
comment
Работает в обе стороны. Счетчик или логический сброс после того, как агрегатор в порядке. - person Artem Bilan; 23.04.2020
comment
К сожалению, мне до сих пор неясно, как использовать SimpleActiveIdleMessageSourceAdvice ... Есть ли шанс указать мне на дополнительную документацию и связанный с ней пример модульного теста в репозитории spring-integration-java-dsl? - person Blink; 23.04.2020
comment
Репо spring-integration-java-dsl устарело. Всем предлагается перейти на последнюю версию Spring Integration. И это SimpleActiveIdleMessageSourceAdvice не имеет ничего общего с DSL. Вам просто нужно настроить его как bean-компонент и использовать из PollerSpec.advice() при настройке этого Jpa.inboundAdapter() в IntegrationFlows.from(). - person Artem Bilan; 23.04.2020
comment
См. ОБНОВЛЕНИЕ в моем ответе, пожалуйста. - person Artem Bilan; 23.04.2020
comment
Большое спасибо. Теперь все яснее :) Позвольте мне вернуться к этому, как только я это реализовал. - person Blink; 24.04.2020
comment
Не могли бы вы взглянуть на мое ОБНОВЛЕНИЕ в сообщении? Я снова заблудился ... - person Blink; 24.04.2020
comment
См. ОБНОВЛЕНИЕ 2 в моем ответе. - person Artem Bilan; 27.04.2020
comment
Спасибо большое ... Почему-то пытался применить совет по адаптеру, а не по поллеру. Я приму ответ и опубликую реализацию. Из любопытства, почему у jpaOutboundAdapter есть метод совета напрямую, а у jpaInboundAdapter - нет? - person Blink; 27.04.2020
comment
Это не правда. См. JpaUpdatingOutboundEndpointSpec - метода advice() не существует. Это часть ConsumerEndpointSpec, которая также является второй лямбда-выражением при настройке handle(). Вероятно, вы используете слишком старый проект spring-integration-java-dsl, который теперь устарел в пользу основного проекта Spring Integration. - person Artem Bilan; 27.04.2020
comment
btw SimpleActiveIdleMessageSourceAdvice теперь устарел. Что было бы для него альтернативой? Спасибо - person Blink; 13.08.2020
comment
Ну, я не уверен, что не так с JavaDocs этого устаревшего класса: @deprecated since 5.3 in favor of {@link SimpleActiveIdleReceiveMessageAdvice} with the same (but more common) functionality. - person Artem Bilan; 13.08.2020