Spring Rabbitlistner перестает слушать очередь, используя синтаксис аннотаций

Мы с коллегой работаем над приложением, использующим Spring, которому необходимо получить сообщение из очереди RabbitMQ. Идея состоит в том, чтобы сделать это с помощью (обычно превосходной) системы пружинных аннотаций, чтобы упростить понимание кода. У нас есть система, работающая с использованием аннотации @RabbitListner, но мы хотим получать сообщение по запросу. Аннотация @RabbitListner этого не делает, она просто получает сообщения, когда они доступны. Спрос определяется «готовностью» клиента, т. е. клиент должен «получить» сообщение из стоп-листа очереди и обработать сообщение. Затем определите, готов ли он принять новый, и переподключитесь к очереди.

Мы пытались сделать это вручную, просто используя модули spring-amqp/spring-rabbit, и хотя это, вероятно, возможно, мы бы очень хотели сделать это с помощью spring. После многих часов поиска и просмотра документации мы не смогли найти ответ.

Вот код получения, который у нас есть на данный момент:

@RabbitListener(queues = "jobRequests")
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@RabbitHandler
public void receive(Job job) throws InterruptedException, IOException {
    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);
}

}

Рабочий процессор:

@Service
public class JobProcessor {

@Autowired
private RabbitTemplate rabbitTemplate;

public boolean processJob(Job job) throws InterruptedException, IOException {
    rabbitTemplate.convertAndSend("jobResponses", job);

    System.out.println(" [x] Processing job: " + job);

    rabbitTemplate.convertAndSend("processedJobs", job);

    return true;
}

}

Другими словами, когда получатель получает задание, он должен прекратить прослушивание новых заданий и дождаться завершения работы обработчика заданий, а затем начать просмотр новых сообщений.


Мы воссоздали исключение нулевого указателя, вот код, который мы используем для отправки со стороны сервера.

@Controller
public class MainController {

@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
private Queue jobRequests;

@RequestMapping("/do-job")
public String doJob() {

    Job job = new Job(new Application(), "henk", 42);

    System.out.println(" [X] Job sent: " + job);

    rabbitTemplate.convertAndSend(jobRequests.getName(), job);

    return "index";
    }
}

И затем код получения на стороне клиента

@Component
public class Receiver {

@Autowired
private JobProcessor jobProcessor;

@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

@RabbitListener(queues = "jobRequests")
public void receive(Job job) throws InterruptedException, IOException, TimeoutException {

    Collection<MessageListenerContainer> messageListenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        System.out.println(listenerContainer);
        listenerContainer.stop();
    }

    System.out.println(" [x] Received '" + job + "'");
    jobProcessor.processJob(job);

    for (MessageListenerContainer listenerContainer :messageListenerContainers) {
        listenerContainer.start();
    }
   }
}

И обновленный рабочий процессор

@Service
public class JobProcessor {

public boolean processJob(Job job) throws InterruptedException, IOException {

    System.out.println(" [x] Processing job: " + job);

    return true;
}

}

И трассировка стека

[x] Received 'Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}'
[x] Processing job: Job{application=com.olifarm.application.Application@aaa517, name='henk', id=42}
Exception in thread "SimpleAsyncTaskExecutor-1" java.lang.NullPointerException
2015-12-18 11:17:44.494 at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1301)
    at java.lang.Thread.run(Thread.java:745)
 WARN 325899 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it

java.lang.NullPointerException: null
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.isActive(SimpleMessageListenerContainer.java:838) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_91]

Останов слушателя работает, и мы получаем новое задание, но когда он пытается запустить его снова, выбрасывается NPE. Мы проверили журнал rabbitMQ и обнаружили, что соединение закрывается примерно на 2 секунды, а затем автоматически открывается снова, даже если мы помещаем поток в спящий режим в обработчике заданий. Это может быть источником проблемы? Однако ошибка не нарушает работу программы, и после ее возникновения получатель все еще может получать новые задания. Мы злоупотребляем механизмом здесь или это действительный код?


person user3157264    schedule 17.12.2015    source источник


Ответы (1)


Чтобы получать сообщения по запросу, обычно лучше использовать rabbitTemplate.receiveAndConvert(), а не прослушиватель; таким образом, вы полностью контролируете получение сообщений.

Начиная с версии 1.5 вы можете настроить шаблон так, чтобы он блокировался на некоторое время (или до прихода сообщения). В противном случае он немедленно возвращает null, если сообщения нет.

Слушатель действительно предназначен для приложений, управляемых сообщениями.

Если вы можете заблокировать поток в прослушивателе до завершения задания, сообщения больше не будут доставляться — по умолчанию в контейнере есть только один поток.

Если по какой-либо причине вы не можете заблокировать поток до завершения задания, вы можете stop()/start() контейнер прослушивателя получить ссылку на него из Реестр конечной точки.

Обычно лучше остановить контейнер в отдельном потоке.

person Gary Russell    schedule 17.12.2015
comment
Спасибо за ответ! Мы пробовали метод регистрации конечной точки раньше, но при повторном подключении это вызывает исключение нулевого указателя. Но мы обязательно рассмотрим получение и преобразование. Я предполагаю, что вам нужно написать свой собственный опросчик, чтобы проверить, было ли новое сообщение помещено в очередь? - person user3157264; 17.12.2015
comment
Хорошо, поместите receiveAndConvert() в цикл. Мне было бы интересно увидеть трассировку стека и тестовый пример, который показывает NPE - я не знаю о каких-либо проблемах. - person Gary Russell; 17.12.2015
comment
Ладно завтра попробуем. Мы также предоставим трассировку стека и код для NPE. Вероятно, это просто из-за какого-то ошибочного кода - person user3157264; 17.12.2015
comment
Спасибо; это ошибка - я открыл задачу; мы выпустим 1.5.3 с исправлением в ближайшем будущем. Как вы заметили, контейнер восстанавливается после проблемы. - person Gary Russell; 18.12.2015
comment
Еще раз спасибо за вашу помощь! - person user3157264; 18.12.2015