Spring boot rabbitMQ DLE не принимает никаких сообщений

Я работаю над rabbitMQ с пружинной загрузкой. Я создаю очередь недоставленных писем, которую я могу видеть в администраторе RabbitMQ как «D, DLE», но, возможно, нет DLK, мне не хватает настройки «x-dead-letter-routing-key», дело в том, что я не требуется ключ маршрутизации. Немногие из моих потребителей привязаны к конкретному обмену, и я создаю DLE для каждого обмена, если есть какие-либо проблемы с потребителем для этого обмена, а затем DLE, подключенный к этому обмену, который получает это сообщение и выполняет логику, зависящую от пользователя. к сожалению, это не работает, DLE не получает никаких сообщений.

Пожалуйста, найдите код ниже,

package com.sample.rabbit;

import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Argument;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import    org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class SampleRabbitApplication {

public static void main(String[] args) throws Exception {
    ConfigurableApplicationContext context = SpringApplication.run(SampleRabbitApplication.class, args);
    context.getBean(SampleRabbitApplication.class).runDemo();
    context.close();
}

@Autowired
private RabbitTemplate template;

private void runDemo() throws Exception {
    this.template.convertAndSend("sample-queue", new Foo("bar"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    });

    this.template.convertAndSend("sample-queue", new Foo("throw"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    });
    this.template.convertAndSend("sample-queue", new Foo("bar"), m -> {
        return new Message("some bad json".getBytes(), m.getMessageProperties());
    });
    Thread.sleep(5000);
}

@RabbitListener(
        id = "sample-queue",
        bindings = @QueueBinding(
                value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-queue", durable = "true"),
                exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "sample.exchange", durable = "true")
        )
)
public void handle(Foo in) {
    System.out.println("Received: " + in);
if("throw".equalsIgnoreCase(in.getFoo())){
        throw new BadRequestException("Foo contains throw so it throwed the exception.");
    }
}

@RabbitListener(
        id = "sample-dead-letter-queue",
        bindings = @QueueBinding(
                value = @org.springframework.amqp.rabbit.annotation.Queue(value = "sample-dead-letter-queue", durable = "true", arguments = {@Argument(name = "x-dead-letter-exchange",value = "sample.exchange"),@Argument(name = "x-dead-letter-routing-key",value = "#")}),
                exchange = @org.springframework.amqp.rabbit.annotation.Exchange(value = "critical.exchange", durable = "true",type = "topic")
        )
)
public void handleDLE(Message in) {
    System.out.println("Received in DLE: " + in.getBody());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jsonConverter());
    factory.setErrorHandler(errorHandler());
    return factory;
}

@Bean
public ErrorHandler errorHandler() {
    return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}

public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

    private final Logger LOG = org.slf4j.LoggerFactory.getLogger(getClass());

    public boolean isFatal(Throwable t) {
        if (t instanceof ListenerExecutionFailedException && isCauseFatal(t.getCause())) {
            //To do : Here we have to configure DLE(Critical queue) and put all the messages in the critical queue.
            ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
            if(lefe.getFailedMessage() != null) {
                LOG.info("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            } else {
                LOG.info("Failed to process inbound message from queue "
                        + lefe.getMessage(), t);
            }
        }
        return super.isFatal(t);
    }

    private boolean isCauseFatal(Throwable cause) {
        return cause instanceof MessageConversionException
                || cause instanceof org.springframework.messaging.converter.MessageConversionException
                || cause instanceof MethodArgumentNotValidException
                || cause instanceof MethodArgumentTypeMismatchException
                || cause instanceof NoSuchMethodException
                || cause instanceof ClassCastException
                || isUserCauseFatal(cause);
    }

    /**
     * Subclasses can override this to add custom exceptions.
     * @param cause the cause
     * @return true if the cause is fatal.
     */
    protected boolean isUserCauseFatal(Throwable cause) {
        return true;
    }


}

public static class Foo {

    private String foo;

    public Foo() {
        super();
    }

    public Foo(String foo) {
        this.foo = foo;
    }

    public String getFoo() {
        return this.foo;
    }

    public void setFoo(String foo) {
        this.foo = foo;
    }

    @Override
    public String toString() {
        return "Foo [foo=" + this.foo + "]";
    }

}
} 

Мои обмены и очереди являются прямыми, каждый из моих потребителей будет использовать разные ключи маршрутизации, но он принадлежит одному и тому же обмену. Итак, как я могу написать DLE, который эффективно потребляет все сообщения об ошибках. В приведенном выше примере кода одно сообщение является успешным, а другое ошибка, но я не видел сообщения об ошибке в DLE.

Любая помощь будет оценена по достоинству.


person VelNaga    schedule 08.03.2017    source источник


Ответы (1)


Если вы настраиваете очередь с обменом недоставленными буквами (DLX), но без ключа маршрутизации недоставленных сообщений, сообщение направляется в DLX с исходным ключом маршрутизации.

Самый простой способ справиться с вашим вариантом использования - сделать DLX обменом темами и привязать к нему очередь с ключом маршрутизации # (подстановочный знак для всех сообщений), и все ошибки будут попадать в эту очередь.

Если вы хотите разделить ошибки на отдельные очереди, свяжите DLQ для каждой с исходным ключом маршрутизации.

ИЗМЕНИТЬ

Вот правильная конфигурация:

@RabbitListener(id = "sample-queue",
        bindings = @QueueBinding(value = @Queue(value = "sample-queue", durable = "true", arguments =
                        @Argument(name = "x-dead-letter-exchange", value = "critical.exchange")),
                    exchange = @Exchange(value = "sample.exchange", durable = "true")))
public void handle(Foo in) {
    System.out.println("Received: " + in);
}

@RabbitListener(id = "sample-dead-letter-queue", containerFactory = "noJsonContainerFactory",
        bindings = @QueueBinding(value = @Queue(value = "sample-dead-letter-queue", durable = "true"),
            exchange = @Exchange(value = "critical.exchange", durable = "true", type = "topic"),
            key = "#"))
public void handleDLE(Message in) {
    System.out.println("Received in DLE: " + new String(in.getBody()));
}

@Bean
public SimpleRabbitListenerContainerFactory noJsonContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(errorHandler());
    return factory;
}
person Gary Russell    schedule 08.03.2017
comment
@ Gary. Большое спасибо за ваш ответ. Я попробовал в соответствии с вашим ответом, но у меня ничего не получилось. Я все еще не могу получить сообщение об ошибке. Не могли бы вы проверить мой код, мой обмен DLE использует отдельный обмен, который является темой, а фактический потребитель использует другой обмен (прямой), это нормально? Я получаю исключение MessageConversionException, и оно входит в раздел isFatal в ConditionalRejectingErrorHandler, но мне не удалось получить это сообщение внутри DLE. Пожалуйста, помогите мне решить эту проблему. - person VelNaga; 08.03.2017
comment
У вас неправильная конфигурация мертвой буквы. Он должен перейти в исходную очередь. Кроме того, вы не можете использовать один и тот же преобразователь для приема от DLQ; см. мое редактирование. - person Gary Russell; 08.03.2017
comment
Да, теперь он входит в DLE, но на странице администратора rabbitMQ очередь недоставленных писем не помечена DKX, DLK вместо очереди потребителей помечена DLX, это не проблема? - person VelNaga; 08.03.2017
comment
Также я обнаружил еще одну проблему ... Если я выбрасываю какое-либо настраиваемое исключение от своего потребителя, он снова и снова перезапускает потребителя. Эти сообщения не поступают в потребителя DLE. Я установил следующее свойство spring.rabbitmq.listener.default- Requeue-rejected = false в application.properties, но бесполезен. - person VelNaga; 08.03.2017
comment
Dead letter queue is not marked with DKX,DLK Это верно - это исходная очередь, которая настроена для отправки отклонений в DLX; DLQ - это обычная очередь. Ваш код отлично работает у меня с указанными выше изменениями - Received in DLE: some bad json; requeueRejected=false означает отправку в DLX всех исключений. - person Gary Russell; 08.03.2017
comment
Пожалуйста, найдите мой обновленный код в вопросе. Я перешел по следующей ссылке lookatsrc.com/source/org/springframework/amqp/rabbit/listener/, а в методе isUserFatal я должен вернуть true, чтобы это не было повторно запрашивая сообщение, сообщение будет перенаправлено в DLQ, если есть какое-либо настраиваемое исключение. - person VelNaga; 08.03.2017
comment
Как я уже сказал, я скопировал ваш исходный код, исправил объявления очереди и фабрику контейнеров, и он отлично работает; Вы, должно быть, сломали что-то еще. Я не готов продолжать отладку вашего кода. - person Gary Russell; 08.03.2017
comment
Рассел ... Я обновил код, который будет генерировать пользовательское исключение внутри потребителя. Пользовательское исключение запрашивается снова и снова даже после установки свойства spring.rabbitmq.listener.default-Requeue-rejected = false, но я могу решить эту проблему проблема, перейдя по ссылке lookatsrc.com/source/org/springframework/amqp/rabbit/listener/ и вместо false я выбрал true в isUserCauseFatal, это правильно? - person VelNaga; 08.03.2017
comment
Возвращение true приведет к отклонению сообщения. Если вы включите ведение журнала DEBUG, вы должны увидеть Rejecting messages (requeue=false). - person Gary Russell; 08.03.2017
comment
Спасибо за информацию журнала отладки. Для меня я могу видеть 2017-03-08 18: 16: 28.378 DEBUG 76327 --- [cTaskExecutor-1] o.s.a.r.listener.BlockingQueueConsumer: Отклонение сообщений (Requeue = true) в журнале DEBUG. - person VelNaga; 08.03.2017
comment
Я могу решить эту проблему, установив factory.setDefaultRequeueRejected (false) ;, каким-то образом свойства в application.properties не работают. Теперь я могу видеть сообщения об отклонении (Requeue = false) - person VelNaga; 08.03.2017