При обработке сообщения через MQ я хотел бы условно откатить транзакцию XA и вернуть сообщение MQ в исходную очередь.
Сбои будут зарегистрированы в базе данных, и их можно будет повторить из базы данных, используя пользовательскую логику на основе сообщения. тип и ошибка.
Если мы не можем зарегистрировать ошибку в базе данных, тогда следует отменить всю транзакцию XA и вернуть сообщение в очередь.
Каждое сообщение обрабатывается в несколько этапов. и код может обрабатывать повторную отправку/дублирование сообщений.
У меня есть решение, но оно приводит к уродливой конфигурации, и я хотел бы знать, есть ли лучший способ добиться того же результата? Я думал об использовании цепочки, которая игнорирует сообщение, если оно ошибочно.
Мне не нравится, что активатор службы не является реальной вызываемой службой. Есть лучший способ сделать это?
<!-- transactionManager is an XA transaction manager -->
<jms:message-driven-channel-adapter id="batchMessagesIn"
destination="batchQueue"
error-channel="batchErrorChannel"
connection-factory="batchConnectionFactory"
channel="batchMessageInChannel"
task-executor="integrationTaskExecutor"
recovery-interval="10000"
concurrent-consumers="1"
max-concurrent-consumers="1"
cache-level="0"
transaction-manager="transactionManager"/>
<channel id="processMessageFirstStage" />
<!-- The number of stages will depend on the type of message and this type of configuration will be duplicated multiple times -->
<int:service-activator input-channel="processMessageFirstStage" ref="messageServiceAdatper" method="processFirstStage" output-channel="checkIfFirstStageResultedInError"/>
<!-- Payload is an instance of CustomMessage -->
<int:router input-channel="checkIfFirstStageResultedInError"
expression="payload.inError" >
<mapping value="true" channel="messageInError" />
<mapping value="false" channel="processMessageSecondStage" />
</int:router>
<int:service-activator input-channel="processMessageSecondStage" ref="messageServiceAdatper" method="processSecondStage" output-channel="checkIfFirstStageResultedInError"/>
<int:router input-channel="checkIfSecondStageResultedInError"
expression="payload.inError" >
<mapping value="true" channel="messageInError" />
<mapping value="false" channel="nullChannel" />
</int:router>
<channel id="messageInError" />
<int:service-activator input-channel="messageInError" ref="errorMessageProcessor" method="handleMessageError" output-channel="nullChannel"/>
<beans:bean id="messageServiceAdatper" class="com.foo.messaging.MessageServiceAdatperImpl"/>
<beans:bean id="errorMessageProcessor" class="com.foo.messaging.ErrorMessageProcessorImpl"/>
<!-- this error channel will only be used for logging -->
<channel id="batchErrorChannel" />
<stream:stderr-channel-adapter channel="batchErrorChannel" append-newline="true" />
public class CustomMessage {
private Throwable throwable;
private String originalMessage;
private boolean inError;
private Object payload;
}
public class MessageServiceAdatperImpl {
@Autowired
private FirstStageService firstStageService;
@Autowired
private SecondStageService secondStageService;
//Don't let a failure rollback the XA transaction
@Transactional
public CustomMessage processFirstStage(CustomMessage customMessage) {
try {
firstStageService.processFirstStage(customMessage.getPayload());
} catch(Throwable e) {
customMessage.setException(e);
}
return customMessage;
}
//Don't let a failure rollback the XA transaction
@Transactional
public CustomMessage processSecondStage(CustomMessage customMessage) {
try {
secondStageService.processSecondStage(customMessage.getPayload());
} catch(Throwable e) {
markMessageInError(customMessage,e)
}
return customMessage;
}
private void markMessageInError(CustomMessage customMessage, Throwable e) {
customMessage.setThrowable(e);
customMessage.setInError(true);
}
}
public class FirstStageService () {
//Start a new transaction. Code also handles duplicate messages
@Transactional(propagation=Propagation.REQUIRES_NEW)
public void processFirstStage() {
//Do some work
}
}
public class ErrorMessageProcessorImpl() {
private static final Marker fatal = MarkerFactory.getMarker("FATAL");
@Transactional
public void handleMessageError(CustomMessage customMessage) {
if (customMessage != null) {
if (customMessage.isInError()) {
try {
//At this point implment custom logic for logging message into the database. Message can be reprocessed from
//database with custom retry limits depending on message type and type of error.
}
catch (Throwable e) {
//At this point roll back the XA transaction and put the message back on the queue
logger.error(fatal, String.format("Fatal error attempting to save error", e));
throw new RuntimeException("Fatal error attempting to save error", e);
}
}
}
}
}