Как справиться с сообщением JSON с помощью Spring-Rabbit в приложении Spring Boot?

Вот мои фрагменты кода.

  • MQConfiguration класс для конфигурации

    @Configuration
    public class MQConfiguration {
        @Bean
        public Receiver receiver() {
            return new Receiver();
        }
    }
    
  • Receiver класс для работы с получением сообщений

    @RabbitListener(queues = "testMQ")
    public class Receiver {
    
        @RabbitHandler
        public void receive(Message msg){
            System.out.println(msg.toString());
        }
    }
    
  • А вот сообщение JSON, которое я отправил в RabbitMQ.

    {
        "id": 1,
        "name": "My Name",
        "description": "This is description about me"
    }
    

Однако при запуске приложения я получил следующее сообщение об ошибке.

2017-02-28 17:16:35.931  WARN 11828 --- [cTaskExecutor-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:872) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:782) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:702) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:186) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1227) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:683) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1181) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1367) [spring-rabbit-1.7.0.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by: org.springframework.amqp.AmqpException: No method found for class [B
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:127) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:224) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:61) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:140) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:106) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:779) ~[spring-rabbit-1.7.0.RELEASE.jar:na]
    ... 10 common frames omitted

Итак, что мне делать, если все, что я хочу, это напечатать сообщение JSON в методе receive()? Я был бы очень признателен, если бы кто-нибудь мог пролить свет на это. :)


person kenshinji    schedule 28.02.2017    source источник


Ответы (4)


Если вы используете Spring Boot, вам просто нужно настроить:

@Bean
public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

В противном случае вы должны настроить:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
    return factory;
}

http://docs.spring.io/spring-amqp/docs/1.7.0.RELEASE/reference/html/_reference.html#async-annotation-driven

person Artem Bilan    schedule 28.02.2017
comment
Также см. пример spring-rabbit-json. - person Gary Russell; 28.02.2017
comment
@Artem Bilan Да, я использую Spring Boot, и ошибка все еще появляется после того, как я добавил аннотацию jsonMessageConverter() в класс конфигурации, вы можете проверить мой код на здесь. - person kenshinji; 01.03.2017

Чтобы отправить JSON в RabbitMQ и использовать его Spring Boot, нам нужно установить content_type.

Позвольте мне описать на примере, где у меня был производитель Python и потребитель Java (я отправлял JSON из Python в RabbitMQ, и Spring Boot Java должен был получить задачу JSON).

Есть два решения:

Решение 1. Отправка в виде строки JSON и преобразование ее вручную с помощью Jakson или GSON

Вам нужно установить content_type="text/plain" и преобразовать JSON в строку. Затем на стороне Spring используйте функцию со строкой в ​​качестве входных данных в качестве слушателя и вручную преобразуйте объект.

КроликХэндлер:

@RabbitHandler
public void receive(String inputString) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    SimStatusReport theResult = objectMapper.readValue(inputString, SimStatusReport.class);

    System.out.println("String instance "  + theResult.toString() +
            " [x] Received");
}

Объект SimStatusReport:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SimStatusReport {
    private String id;
    private int t;
}

Вот мой код Python:

import pika
import json
import uuid


connectionResult = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channelResult = connectionResult.channel()
routing_key_result = 'sim_results'
channelResult.queue_declare(queue=routing_key_result, durable=True)

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="text/plain",
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)


newsim_status = {'id': str(uuid.uuid4()), 't': 0}
publish_result(newsim_status)

Решение 2. Отправьте строку JSON и позвольте Jackson2JsonMessageConverter автоматически выполнить преобразование.

Вам нужно установить content_type="application/json". Затем вам нужно добавить соответствующий заголовок к __TypeId__ в заголовке запроса RabbitMQ. Вам нужно указать точное пространство имен объекта, чтобы Джексон понял преобразование.

Вот мой пример с использованием Python (только функция publish_result):

def publish_result(sim_status):
    message =json.dumps(sim_status)
    channelResult.basic_publish(exchange='',
                                routing_key=routing_key_result,
                                body=message,
                                properties=pika.BasicProperties(
                                    content_type="application/json"
                                    headers={'__TypeId__': 'com.zarinbal.simtest.run.model.SimStatusReport'},
                                    content_encoding= 'UTF-8',
                                    delivery_mode=2,  # make message persistent
                          ))
    print("Sent ", message)

Затем вам нужно настроить Java для использования Jackson2JsonMessageConverter:

@Configuration
    public class RabbitConfiguration {
        @Bean
        public MessageConverter jsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }

Вот ваш слушатель:

@RabbitListener(queues = "sim_results")
public class TaskReceiver {
    @RabbitHandler
    public void receive(SimStatusReport in) {
        System.out.println("Object instance "  + in +
                " [x] Received");
    }
}

Примечание. Убедитесь, что все ваши объекты имеют сеттеры и геттеры для всех свойств и всех конструкторов аргументов. Я использую @Data, @NoArgsConstructor и @AllArgsConstructor из ломбока для его автоматического создания.

person Community    schedule 10.01.2018
comment
Или используйте ClassMapper, например. DefaultJackson2JavaTypeMapper и установите сопоставления классов. Тогда вам не нужно раскрывать (тайт-пару) детали реализации. - person Ursache; 26.01.2021

Я знаю, что прошло некоторое время с момента последнего ответа, но я хотел бы добавить свой ответ. На всякий случай, если у кого-то еще есть такие же проблемы.

Когда я настроил приемник, подобный вашему, я столкнулся с аналогичной проблемой, когда я не мог преобразовать JSON в теле сообщения RabbitMQ в свой объект. Он просто возвращает сообщение об ошибке, подобное этому.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to convert message
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:146) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1654) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1573) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1561) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1552) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1496) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) [spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) [spring-rabbit-2.3.9.jar:2.3.9]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_271]
Caused by: org.springframework.amqp.AmqpException: No method found for class java.util.LinkedHashMap
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:185) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.DelegatingInvocableHandler.getMethodFor(DelegatingInvocableHandler.java:317) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.getMethodFor(HandlerAdapter.java:110) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:192) ~[spring-rabbit-2.3.9.jar:2.3.9]
        at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:137) ~[spring-rabbit-2.3.9.jar:2.3.9]
        ... 11 common frames omitted

После нескольких часов поиска на разных веб-сайтах без хорошего ответа я наконец обнаружил, что @RabbitListener не будет работать на уровне класса. Однако, когда вы перемещаете его на уровень метода, он работает нормально.

Итак, я удаляю @RabbitHandler и перемещаю @RabbitListener в метод приема.

Ваш код приемника должен выглядеть так после редактирования.

public class Receiver {

    @RabbitListener(queues = "testMQ")
    public void receive(Message msg){
        System.out.println(msg.toString());
    }
}

Я до сих пор не совсем понимаю, почему это не работает на уровне класса. Если у кого-то есть хорошее объяснение, пожалуйста, поделитесь им в разделе комментариев. Большое тебе спасибо.

Источник: https://titanwolf.org/Network/Articles/Article?AID=0f214d79-10f0-4b4c-9478-607428770256#gsc.tab=0

person The Wind    schedule 01.07.2021

простым способом вы можете использовать эту функцию new String(Messages)

@RabbitListener(queues = "testMQ")
public class Receiver {

    @RabbitHandler
    public void receive(Message msg){
String MQMessage = new String(msg.getBody());
        System.out.println(MQMessage);
    }
}
person Musab Abdelrahman    schedule 16.02.2020