SourcePollingChannelAdapter с транзакцией

Я хотел бы использовать SourcePollingChannelAdapter с ТРЕБУЕМЫМ распространением транзакции, когда опрос реализован, чтобы откатить все операции в случае возникновения ошибки. Метод setTransactionSynchronizationFactory не комментируется... Большое спасибо за помощь!

В XML я могу сделать:

<int:poller fixed-rate="5000">
  <int:transactional transaction-manager="transactionManager" propagation="REQUIRED" />
</int:poller>

Я хотел бы использовать подобную транзакцию программно с SourcePollingChannelAdapter и PeriodicTrigger, но я не знаю, как это сделать.

У меня есть это :

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
adapter.setSource(source);
adapter.setTrigger(new PeriodicTrigger(5, TimeUnit.SECONDS));
adapter.setOutputChannel(channel);
adapter.setBeanFactory(ctx);
adapter.start();

Когда вызывается источник компонента, элемент в базе данных удаляется, сообщение создается и отправляется в выходной канал; но если у меня есть ошибка в потоке после выходного канала, я бы хотел, чтобы база данных была восстановлена, а элемент вернулся ... на самом деле простая транзакция с распространением. Я не понимаю, как это сделать.

Выходной канал:

<int:channel id="channel" >
    <int:queue />
</int:channel>
<int-http:outbound-gateway request-channel="channel"
    url="http://localhost:8081/icopitole-ws/baseactive" http-method="GET"
    reply-channel="reresponseVersionChannel" expected-response-type="java.lang.String"  />

Когда URL-адрес не отвечает, создается исключение, но откат не выполняется, хотя я добавил DefaultTransactionSynchronizationFactory и TransactionInterceptor, как вы сказали :(


person Médéric Martin    schedule 17.10.2013    source источник
comment
Извините, недостаточно информации. poller можно пометить <transactional synchronization-factory="syncFactory">. Здесь, пожалуйста: docs.spring.io/spring-integration/docs/2.2.6.RELEASE/reference/   -  person Artem Bilan    schedule 17.10.2013
comment
Я обновил свой ответ в соответствии со свежей информацией   -  person Artem Bilan    schedule 17.10.2013
comment
Я обновил свой вопрос, спасибо за ваше время.   -  person Médéric Martin    schedule 17.10.2013
comment
Во-первых, ваш вопрос выглядит очень плохо. Вы должны показать всю информацию с самого начала. Теперь я вижу, что происходит: ваш канал находится в очереди, поэтому http-запрос будет обрабатываться отдельным потоком, и ваша транзакция будет успешно зафиксирована сразу после помещения сообщения в очередь. Переключи на прямой канал и все будет ОК. Пожалуйста, делайте вопросы более информативными - сэкономьте наше время!   -  person Artem Bilan    schedule 17.10.2013
comment
Я очень извиняюсь (также за мой плохой английский), я пишу в первый раз, так что я сделаю все возможное в следующий раз. Я думал, что канал должен быть PollableChannel. С DirectChannel это работает. Спасибо вам большое за все!   -  person Médéric Martin    schedule 17.10.2013


Ответы (1)


Если я правильно вас понимаю, вам нужно использовать этот: DefaultTransactionSynchronizationFactory

А вот снимок, как его настроить:

SourcePollingChannelAdapter adapter = new SourcePollingChannelAdapter();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
            new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(mock(BeanFactory.class));
    PollableChannel queueChannel = new QueueChannel();
    syncProcessor.setBeforeCommitExpression(new SpelExpressionParser().parseExpression("#bix"));
    syncProcessor.setBeforeCommitChannel(queueChannel);
    syncProcessor.setAfterCommitChannel(queueChannel);
    syncProcessor.setAfterCommitExpression(new SpelExpressionParser().parseExpression("#baz"));

    DefaultTransactionSynchronizationFactory syncFactory =
            new DefaultTransactionSynchronizationFactory(syncProcessor);

    adapter.setTransactionSynchronizationFactory(syncFactory);

Границы транзакций указаны в SourcePollingChannelAdapter#adviceChain, поэтому его следует настроить следующим образом:

    TransactionInterceptor txAdvice = 
    new TransactionInterceptor(transactionManager, 
         new MatchAlwaysTransactionAttributeSource(new DefaultTransactionAttribute()));
    adapter.setAdviceChain(Collections.singletonList(txAdvice));

Итак, теперь каждый «опрос» будет обернут транзакцией, и ваш syncFactory сделает все необходимое.

person Artem Bilan    schedule 17.10.2013