Spring AMQP - отправитель и получение сообщений

Я столкнулся с проблемой при получении сообщения от RabbitMQ. Я отправляю сообщение, как показано ниже

        HashMap<Object, Object> senderMap=new HashMap<>();
        senderMap.put("STATUS", "SUCCESS");
        senderMap.put("EXECUTION_START_TIME", new Date());

        rabbitTemplate.convertAndSend(Constants.ADAPTOR_OP_QUEUE,senderMap);

Если мы увидим в RabbitMQ, мы получим полностью квалифицированный тип.

В текущем сценарии у нас есть n производителей для одного и того же потребителя. Если я использую какой-либо картограф, это приведет к исключению. Как я могу отправить сообщение, чтобы оно не содержало type_id, и я мог бы получить сообщение как объект сообщения, а позже я мог бы привязать его к своему настраиваемому объекту в получателе.

Я получаю сообщение, подобное приведенному ниже. Не могли бы вы сообщить мне, как использовать Jackson2MessageConverter, чтобы это сообщение напрямую связывалось с моим Object / HashMap со стороны получателя. Также я удалил Type_ID у отправителя.

Как выглядит сообщение в RabbitMQ

приоритет: 0 режим_доставки: 2 заголовка:
ContentTypeId: java.lang.Object KeyTypeId: java.lang.Object content_encoding: UTF-8 content_type: application / json { "Execution_start_time": 1473747183636, "status": "SUCCESS"}

@Component
public class AdapterOutputHandler {

    private static Logger logger = Logger.getLogger(AdapterOutputHandler.class);

    @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE)
    public void handleAdapterQueueMessage(HashMap<String,Object> message){

        System.out.println("Receiver:::::::::::"+message.toString());

    }

}

Связь

@Bean(name="adapterOPListenerContainerFactory")
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
        DefaultClassMapper classMapper = new DefaultClassMapper();
        messageConverter.setClassMapper(classMapper);
        factory.setMessageConverter(messageConverter);

    }

Исключение

Caused by: org.springframework.amqp.support.converter.MessageConversionException: failed to convert Message content. Could not resolve __TypeId__ in header and no defaultType provided
    at org.springframework.amqp.support.converter.DefaultClassMapper.toClass(DefaultClassMapper.java:139)

Я не хочу использовать __TYPE__ID от отправителя, потому что они являются несколькими отправителями для одной очереди и только одним потребителем.


person BIndu_Madhav    schedule 12.09.2016    source источник
comment
это приводит к исключению недостаточно информации. Добавьте трассировку стека, пожалуйста   -  person Jens    schedule 12.09.2016
comment
На самом деле заголовки в rabbitmq содержат свойство type_id_. Этого не должно быть. Как отправить сообщение, в котором свойство type_id_ отсутствует priority: 0 delivery_mode: 2 __TypeId__: com.diff.approach.JobListenerDTO** content_encoding: UTF-8 content_type: application/json   -  person BIndu_Madhav    schedule 12.09.2016


Ответы (2)


это приводит к исключению

Какое исключение?

TypeId: com.diff.approach.JobListenerDTO

Это означает, что вы отправляете DTO, а не хеш-карту, как вы описываете в вопросе.

Если вы хотите удалить заголовок typeId, вы можете использовать обработчик сообщений ...

rabbitTemplate.convertAndSend(Constants.INPUT_QUEUE, dto, m -> {
    m.getMessageProperties.getHeaders().remove("__TypeId__");
    return m;
});

(или , new MessagePostProcessor() {...}, если вы не используете Java 8).

ИЗМЕНИТЬ

Какую версию Spring AMQP вы используете? В версии 1.6 вам даже не нужно удалять заголовок __TypeId__ - фреймворк смотрит на тип параметра слушателя и сообщает преобразователю Джексона тип, чтобы он автоматически преобразовал в него (если может). Как вы можете видеть здесь; он отлично работает без удаления идентификатора типа ...

package com.example;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class So39443850Application {

    private static final String QUEUE = "so39443850";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39443850Application.class, args);
        context.getBean(RabbitTemplate.class).convertAndSend(QUEUE, new DTO("baz", "qux"));
        context.getBean(So39443850Application.class).latch.await(10, TimeUnit.SECONDS);
        context.getBean(RabbitAdmin.class).deleteQueue(QUEUE);
        context.close();
    }

    private final CountDownLatch latch = new CountDownLatch(1);

    @RabbitListener(queues = QUEUE, containerFactory = "adapterOPListenerContainerFactory")
    public void listen(HashMap<String, Object> message) {
        System.out.println(message.getClass() + ":" + message);
        latch.countDown();
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory adapterOPListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    public static class DTO {

        private String foo;

        private String baz;

        public DTO(String foo, String baz) {
            this.foo = foo;
            this.baz = baz;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        public String getBaz() {
            return this.baz;
        }

        public void setBaz(String baz) {
            this.baz = baz;
        }

    }

}

Результат:

class java.util.HashMap:{foo=baz, baz=qux}

Это описано в документации. ...

В версиях до 1.6 информация о типе для преобразования JSON должна была быть предоставлена ​​в заголовках сообщений, либо требовался специальный ClassMapper. Начиная с версии 1.6, при отсутствии заголовков информации о типе тип может быть выведен из аргументов целевого метода.

Вы также можете настроить пользовательский ClassMapper, чтобы всегда возвращать HashMap.

person Gary Russell    schedule 12.09.2016
comment
Спасибо Гэри, еще один вопрос, как получить это сообщение? headers: __ContentTypeId__: java.lang.Object __KeyTypeId__: java.lang.Object content_encoding: UTF-8 content_type: application/json - person BIndu_Madhav; 13.09.2016
comment
Я получаю сообщение, подобное приведенному ниже @RabbitListener(containerFactory="adapterOPListenerContainerFactory",queues=Constants.ADAPTOR_OP_QUEUE) public void handleAdapterQueueMessage(Message message){ byte[] body = message.getBody(); } Как преобразовать из байта [] в хэш-карту обратно ?? - person BIndu_Madhav; 13.09.2016
comment
Не помещайте код в комментарии - он нечитабелен - вместо этого отредактируйте свой вопрос. Вам нужен Jackson2JsonMessageConverter в фабрике контейнеров слушателей. - person Gary Russell; 13.09.2016
comment
Привет, Гэри, я добавил свой код приемника, не могли бы вы подсказать, как использовать Jackson2JsonMessageConverter - person BIndu_Madhav; 14.09.2016
comment
Версия Spring AMQP - 1.6.1.RELEASE Версия Spring RabbitMQ - 1.5.6.RELEASE - person BIndu_Madhav; 15.09.2016
comment
Почему вы используете несовпадающие версии? Это не сработает; они должны быть одной и той же версии. Если вы используете maven или gradle, вам нужно только объявить spring-rabbit, и соответствующая версия spring-amqp будет загружена автоматически (транзитивно). Тем не менее, вы всегда должны использовать одни и те же версии для обеих банок; текущая версия - 1.6.2.RELEASE - см. страницу проекта. - person Gary Russell; 15.09.2016
comment
Да, Гэри. Я использовал одни и те же версии для обоих сейчас. Это сработало. Спасибо! - person BIndu_Madhav; 15.09.2016

  • Хотите использовать "другой" Java-вызов при получении сообщения?

    Настройте @Bean Jackson2JsonMessageConverter с пользовательским ClassMapper

  • Хотите использовать "много" разных Java-программ при получении сообщения? Такие как :

    @MyAmqpMsgListener
    void handlerMsg(
            // Main message class, by MessageConverter
            @Payload MyMsg myMsg, 
    
            // Secondary message class - by MessageConverter->ConversionService
            @Payload Map<String, String> map,
    
            org.springframework.messaging.Message<MyMsg> msg,
            org.springframework.amqp.core.Message amqpMsg
    ) {
        // ...
    }
    

    Предоставьте собственный @Bean Converter, ConversionServiceRabbitListenerAnnotationBeanPostProcessor:

    @Bean
    FormattingConversionServiceFactoryBean rabbitMqCs(
            Set<Converter> converters
    ) {
        FormattingConversionServiceFactoryBean fac = new FormattingConversionServiceFactoryBean();
        fac.setConverters(converters);
        return fac;
    }
    @Bean
    DefaultMessageHandlerMethodFactory messageHandlerMethodFactory(
            @Qualifier("rabbitMqCs")
            FormattingConversionService rabbitMqCs
    ) {
        DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
        defaultFactory.setConversionService(rabbitMqCs);
        return defaultFactory;
    }
    
    // copied from RabbitBootstrapConfiguration
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor(
            MessageHandlerMethodFactory handlerFac
    ) {
        RabbitListenerAnnotationBeanPostProcessor bpp = new RabbitListenerAnnotationBeanPostProcessor();
        bpp.setMessageHandlerMethodFactory(handlerFac);
        return bpp;
    }
    
    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
    

Использованная литература:

person btpka3    schedule 02.08.2017