У меня есть следующий поток, который я хотел бы реализовать с помощью Spring Integration Java DSL:
- Опрашивать таблицу в базе данных каждые 2 часа, которая возвращает идентификатор документов, которые необходимо обработать.
- Для каждого идентификатора обработать документ через HTTP-шлюз.
- Сохранить ответ в базе данных
У меня есть рабочий код Java, который выполняет именно эти шаги. Дополнительное требование, с которым я борюсь, заключается в том, что опрос для следующего раунда документов не должен происходить до тех пор, пока все документы из последнего опроса не будут обработаны и сохранены в базе данных.
Есть ли какой-нибудь шаблон в Spring Integration, который я мог бы использовать для этого дополнительного требования?
Вот упрощенный код - он станет более сложным, и я разделю обработку документов (исходящий и постоянный HTTP) на отдельные классы / потоки:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
ОБНОВЛЕНИЕ
Следуя предложению Артема, я пытаюсь реализовать его с помощью SimpleActiveIdleMessageSourceAdvice
class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
Если я правильно понимаю, приведенный выше код перестанет опрашивать. Теперь я не знаю, как прикрепить этот совет к Jpa.inboundAdapter ... Кажется, у него нет подходящего метода (ни Advice, ни Spec Handler). Я упускаю здесь что-то очевидное? Я пробовал прикрепить совет к Jpa.retrievingGateway, но это никак не меняет поток.
ОБНОВЛЕНИЕ2
Проверьте этот вопрос, чтобы получить полное решение: Spring Integration: как выполнить модульное тестирование совет