Мы с коллегой работаем над приложением, использующим Spring, которому необходимо получить сообщение из очереди RabbitMQ. Идея состоит в том, чтобы сделать это с помощью (обычно превосходной) системы пружинных аннотаций, чтобы упростить понимание кода. У нас есть система, работающая с использованием аннотации @RabbitListner, но мы хотим получать сообщение по запросу. Аннотация @RabbitListner этого не делает, она просто получает сообщения, когда они доступны. Спрос определяется «готовностью» клиента, т. е. клиент должен «получить» сообщение из стоп-листа очереди и обработать сообщение. Затем определите, готов ли он принять новый, и переподключитесь к очереди.
Мы пытались сделать это вручную, просто используя модули spring-amqp/spring-rabbit, и хотя это, вероятно, возможно, мы бы очень хотели сделать это с помощью spring. После многих часов поиска и просмотра документации мы не смогли найти ответ.
Вот код получения, который у нас есть на данный момент:
@RabbitListener(queues = "jobRequests")
public class Receiver {
@Autowired
private JobProcessor jobProcessor;
@RabbitHandler
public void receive(Job job) throws InterruptedException, IOException {
System.out.println(" [x] Received '" + job + "'");
jobProcessor.processJob(job);
}
}
Рабочий процессор:
@Service
public class JobProcessor {
@Autowired
private RabbitTemplate rabbitTemplate;
public boolean processJob(Job job) throws InterruptedException, IOException {
rabbitTemplate.convertAndSend("jobResponses", job);
System.out.println(" [x] Processing job: " + job);
rabbitTemplate.convertAndSend("processedJobs", job);
return true;
}
}
Другими словами, когда получатель получает задание, он должен прекратить прослушивание новых заданий и дождаться завершения работы обработчика заданий, а затем начать просмотр новых сообщений.
Мы воссоздали исключение нулевого указателя, вот код, который мы используем для отправки со стороны сервера.
@Controller
public class MainController {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
private Queue jobRequests;
@RequestMapping("/do-job")
public String doJob() {
Job job = new Job(new Application(), "henk", 42);
System.out.println(" [X] Job sent: " + job);
rabbitTemplate.convertAndSend(jobRequests.getName(), job);
return "index";
}
}
И затем код получения на стороне клиента
@Component
public class Receiver {
@Autowired
private JobProcessor jobProcessor;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@RabbitListener(queues = "jobRequests")
public void receive(Job job) throws InterruptedException, IOException, TimeoutException {
Collection<MessageListenerContainer> messageListenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();
for (MessageListenerContainer listenerContainer :messageListenerContainers) {
System.out.println(listenerContainer);
listenerContainer.stop();
}
System.out.println(" [x] Received '" + job + "'");
jobProcessor.processJob(job);
for (MessageListenerContainer listenerContainer :messageListenerContainers) {
listenerContainer.start();
}
}
}
И обновленный рабочий процессор
@Service
public class JobProcessor {
public boolean processJob(Job job) throws InterruptedException, IOException {
System.out.println(" [x] Processing job: " + job);
return true;
}
}
И трассировка стека
[x] Received 'Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}'
[x] Processing job: Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}
Exception in thread "SimpleAsyncTaskExecutor-1" java.lang.NullPointerException
2015-12-18 11:17:44.494 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1301)
at java.lang.Thread.run(Thread.java:745)
WARN 325899 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it
java.lang.NullPointerException: null
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]
Останов слушателя работает, и мы получаем новое задание, но когда он пытается запустить его снова, выбрасывается NPE. Мы проверили журнал rabbitMQ и обнаружили, что соединение закрывается примерно на 2 секунды, а затем автоматически открывается снова, даже если мы помещаем поток в спящий режим в обработчике заданий. Это может быть источником проблемы? Однако ошибка не нарушает работу программы, и после ее возникновения получатель все еще может получать новые задания. Мы злоупотребляем механизмом здесь или это действительный код?