Публикация транзакций Spring-AMQP без исключения

Я пытаюсь использовать канал Transactionnal RabbitMQ с Spring-AMQP, но я хочу фактически проглотить исключения, чтобы регистрировать их и иметь возможность их восстанавливать.

Использование channelTranscted = true заставляет канал также присоединиться к текущему транзакционному менеджеру (в моем случае - Hibernate), и это приводит к тому, что исключение фиксации повторно выбрасывается за пределы границ @Transactionnal, что приводит к сбою на верхнем уровне без возможности его перехвата и регистрации. Это.

Я также попытался вручную прикрепить публикацию к транзакции, чтобы она выполнялась только после успешной фиксации:

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String message) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                rabbitTemplate.convertAndSend(routingKey, message);
            } catch (Exception exception) {
                logger.error("Error while publishing message to RabbitMQ ");
            }
        }
});

используется таким образом:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

но в этом случае я не могу использовать channelTranscted = true, потому что он вложит registeringSynchronization в другую registeringSynchronization и вообще не сможет быть вызван ...

Есть ли способ этого достичь?

ОБНОВЛЕНИЕ: в идеале я бы хотел переопределить RabbitResourceSynchronization, который используется в классе ConnectionFactoryUtils, но это частный класс без фабрики, созданной с помощью

TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder, connectionFactory, synched));

person bpavot    schedule 09.03.2015    source источник
comment
Можно ли вместо транзакций использовать возвраты издателя? Если вы смотрите только на регистрацию проблемы, вы получаете сообщение и причину сбоя в обратном вызове, и ваша основная транзакция не задерживается для фиксации.   -  person Gary Russell    schedule 09.03.2015
comment
Действительно, мы действительно имели это в виду, но поскольку мы не хотим терять сообщения, это также подразумевает наличие постоянного хранилища для сообщений и механизма повтора наверху. Итак, мы хотели использовать транзакции, у которых есть первый простой подход ... который оказался не так уж и прост.   -  person bpavot    schedule 09.03.2015


Ответы (1)


Решение, которое я реализовал, заключалось в том, чтобы выполнять публикацию внутри новой транзакции после фиксации основной транзакции.

Первый звонок:

Entity entity = save(entity);
publishFailSafeAfterSuccessfulTransaction("routingkey", "Entity was updated");

Этот метод регистрируется для публикации после подтверждения основной транзакции.

public void publishFailSafeAfterSuccessfulTransaction(final String routingKey, final String event) {

    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            try {
                publishFailSafe(routingKey, event);
            } catch (Exception exception) {
                //Do some recovering
            }
        }
    });
}

После того, как основная транзакция зафиксирована, эта будет выполнять публикацию. По мере того, как канал обрабатывается, он фиксирует сообщение при фиксации этой новой транзакции и терпит неудачу только в этой транзакции, а ошибки будут обнаружены в предыдущем методе.

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void publishFailSafe(String routingKey, String event) {
    try {
        rabbitTemplate.convertAndSend(routingKey.getRoutingKey(), event);
    } catch (Exception exception) {
        //Do some recovering
    }
}
person bpavot    schedule 23.03.2015