Интеграция Spring с JMS и условный откат транзакции XA

При обработке сообщения через MQ я хотел бы условно откатить транзакцию XA и вернуть сообщение MQ в исходную очередь.
Сбои будут зарегистрированы в базе данных, и их можно будет повторить из базы данных, используя пользовательскую логику на основе сообщения. тип и ошибка.
Если мы не можем зарегистрировать ошибку в базе данных, тогда следует отменить всю транзакцию XA и вернуть сообщение в очередь.
Каждое сообщение обрабатывается в несколько этапов. и код может обрабатывать повторную отправку/дублирование сообщений.

У меня есть решение, но оно приводит к уродливой конфигурации, и я хотел бы знать, есть ли лучший способ добиться того же результата? Я думал об использовании цепочки, которая игнорирует сообщение, если оно ошибочно.
Мне не нравится, что активатор службы не является реальной вызываемой службой. Есть лучший способ сделать это?

<!-- transactionManager is an XA transaction manager --> 
<jms:message-driven-channel-adapter id="batchMessagesIn" 

<channel id="processMessageFirstStage" />

<!-- The number of stages will depend on the type of message and this type of configuration will be duplicated multiple times -->
<int:service-activator input-channel="processMessageFirstStage" ref="messageServiceAdatper" method="processFirstStage" output-channel="checkIfFirstStageResultedInError"/>  

<!-- Payload is an instance of CustomMessage -->
<int:router input-channel="checkIfFirstStageResultedInError"
    expression="payload.inError"  >
    <mapping value="true" channel="messageInError" />
    <mapping value="false" channel="processMessageSecondStage" />

<int:service-activator input-channel="processMessageSecondStage" ref="messageServiceAdatper" method="processSecondStage" output-channel="checkIfFirstStageResultedInError"/>    

<int:router input-channel="checkIfSecondStageResultedInError"
    expression="payload.inError"  >
    <mapping value="true" channel="messageInError" />
    <mapping value="false" channel="nullChannel" />

<channel id="messageInError" />

<int:service-activator input-channel="messageInError" ref="errorMessageProcessor" method="handleMessageError" output-channel="nullChannel"/>

<beans:bean id="messageServiceAdatper" class="com.foo.messaging.MessageServiceAdatperImpl"/>
<beans:bean id="errorMessageProcessor" class="com.foo.messaging.ErrorMessageProcessorImpl"/>

<!-- this error channel will only be used for logging -->
<channel id="batchErrorChannel" />
<stream:stderr-channel-adapter channel="batchErrorChannel" append-newline="true" />
public class CustomMessage {
    private Throwable throwable;
    private String originalMessage;
    private boolean inError;
    private Object payload;

public class MessageServiceAdatperImpl {

    private FirstStageService firstStageService;
    private SecondStageService secondStageService;

    //Don't let a failure rollback the XA transaction
    public CustomMessage processFirstStage(CustomMessage customMessage) {
        try {
        } catch(Throwable e) {
        return customMessage;

    //Don't let a failure rollback the XA transaction
    public CustomMessage processSecondStage(CustomMessage customMessage) {
        try {
        } catch(Throwable e) {
        return customMessage;

    private void markMessageInError(CustomMessage customMessage, Throwable e) {

public class FirstStageService () {
    //Start a new transaction. Code also handles duplicate messages
    public void processFirstStage() {
        //Do some work

public class ErrorMessageProcessorImpl() {
   private static final Marker fatal = MarkerFactory.getMarker("FATAL");

    public void handleMessageError(CustomMessage customMessage) {
         if (customMessage != null) {

            if (customMessage.isInError()) {
                try {

                    //At this point implment custom logic for logging message into the database. Message can be reprocessed from
                    //database with custom retry limits depending on message type and type of error.

                catch (Throwable e) {
                    //At this point roll back the XA transaction and put the message back on the queue
                    logger.error(fatal, String.format("Fatal error attempting to save error", e));
                    throw new RuntimeException("Fatal error attempting to save error", e);


person Michael Freeman    schedule 26.01.2015    source источник

Ответы (1)

Поскольку ваша логика настолько ближе к этому MessageServiceAdatperImpl, как насчет того, чтобы избежать таких накладных расходов с Spring Integration (я имею в виду <router>s) и просто сделать try...catch и if...else в коде?

С другой стороны, вы можете закодировать собственный общий Router, который просто возвращает имена каналов по своей внутренней логике.

Или... Routing Slip начиная с Spring Integration 4.1

person Artem Bilan    schedule 26.01.2015
К сожалению, нам приходится обрабатывать несколько разных типов сообщений. Существует несколько различных служб, которые одни потоки сообщений будут использовать повторно, а другие нет. Я также хотел бы, чтобы поток был виден в конфигурации. Я подозреваю, что буду использовать цепочку и не буду вызывать базовую службу, если входящее сообщение ошибочно. Либо это использовать MDB, либо обернуть вызов Spring Integration в try catch, чтобы я мог управлять повторной отправкой сообщения MQ в случае ошибки. - person Michael Freeman; 27.01.2015
Вот почему я предлагаю Routing Slip и вижу там следующий раздел: Process Manager Enterprise Integration Pattern. У вас есть только один центральный RoutingSlipRouteStrategy, который решает, куда отправить сообщение. Ваша конфигурация должна быть более понятной. - person Artem Bilan; 27.01.2015
Артем, спасибо, думаю, после мессенджера-канала-адаптера просто позвоню активатору услуги. Эта служба вызовет новый шлюз в новой транзакции. Если возвращенное сообщение содержит ошибку или возвращается какая-либо ошибка, ее можно зарегистрировать. Если это ведение журнала завершится ошибкой, служба выдаст исключение, что приведет к откату сообщения. Есть только одно небольшое нарушение в конфигурации, а все остальное будет намного чище. Надо было сначала посмотреть на это. Спасибо, что помогли мне все обдумать. - person Michael Freeman; 27.01.2015