нет фиксированного порядка доставки сообщений на сервер Rabbitmq в многопоточной среде

пожалуйста, сначала посмотрите на мой код.

Это мой тестовый класс, который создает 2000 потоков, и эти потоки отправляют сообщения.

public class MessageSenderMultipleThreadMock {
    @Autowired
    MessageList message;
    @Autowired
    MessageSender sender;

    public boolean process() throws InterruptedException {

        for (int i = 0; i < 2000; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {

                    String routingkey = "operation"
                            + UUID.randomUUID().toString();
                    String queueName = UUID.randomUUID().toString();

                    message.setSender(Thread.currentThread().getName());
                    try {
                        sender.sendMessage(routingkey, queueName,
                                "this is message");
                    } catch (InvalidMessagingParameters e) {
                        e.printStackTrace();
                    }

                }
            }).start();
            Thread.sleep(1000);

        }
        Thread.currentThread();
        Thread.sleep(10000);
        return true;
    }
}

Отправитель сообщения

это мой основной класс отправителя сообщений

    @Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MessageList message;
    String queueName = "";
    String routingKey = "";
    @Autowired
    private QueueCreationService service;
    private boolean messageSentFlag;
    String returnedMessage = "";
    private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());

    public boolean sendMessage(String routingKey, String queueName,
            String messageToBeSent) throws InvalidMessagingParameters {
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);

        else {
            this.routingKey = routingKey;
            this.queueName = queueName;
        }
        service.processBinding(queueName, routingKey);
        message.addMessages(messageToBeSent);
        return execute();
    }

    /*
     * overloaded sendMessage method will use requestMap . RequestMap includes
     * queueName and routingKey that controller provides.
     */
    public boolean sendMessage(Map<String, String> requestMap)
            throws MessagingConnectionFailsException,
            InvalidMessagingParameters {
        this.queueName = requestMap.get("queue");
        this.routingKey = requestMap.get("routingkey");
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);
        service.processBinding(queueName, routingKey);
        preparingMessagingTemplate();
        return execute();
    }

    private boolean execute() {
        for (int i = 0; i < 5 && !messageSentFlag; i++) {
            executeMessageSending();
        }
        return messageSentFlag;
    }

    private String convertMessageToJson(MessageList message) {
        ObjectWriter ow = new ObjectMapper().writer()
                .withDefaultPrettyPrinter();
        String json = "";
        try {
            json = ow.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return json;
    }

    private void executeMessageSending() {
        rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
                convertMessageToJson(message), new CorrelationData(UUID
                        .randomUUID().toString()));

    }

    private void preparingMessagingTemplate() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode,
                    String replyText, String exchange, String routingKey) {
                returnedMessage = replyText;
            }
        });
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack,
                    String cause) {
                System.out.println("*" + ack);

                if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
                    messageSentFlag = ack;
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+  has been successfully delivered");
                } else {
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+ has not been delivered");

                }
            }
        });
    }
}

Мой класс конфигурации, который используется для обмена сообщениями

    @Configuration
    @ComponentScan("com.alpharaid.orange.*")
    @PropertySource("classpath:application.properties")

public class MessageConfiguration {

    String content = "";
    @Value("${rabbitmq_host}")
    String host = "";
    String port = "";
    @Value("${rabbitmq_username}")
    String userName = "";
    @Value("${rabbitmq_password}")
    String password = "";
    String queueName = "";
    InputStream input = null;

    @Autowired
    public MessageConfiguration() {
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    @Scope("prototype")
    public QueueCreationService service() {
        return new QueueCreationService();
    }

    @Bean
    @Scope("prototype")
    public RabbitAdmin admin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                this.host);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

}

Мои проблемы:

  1. Как я вижу на сервере, некоторые потоки успешно доставляют сообщения, а другие нет.

  2. нет полной уверенности в прослушивателе rabbitTemplate (

    rabbitTemplate.setReturnCallback (новый ReturnCallback () {

мне нужно, чтобы слушатель работал каждый раз, потому что на этом основании я попытаюсь отправить сообщение снова

    private boolean execute() {
    for (int i = 0; i < 5 && !messageSentFlag; i++) {
        executeMessageSending();
    }
    return messageSentFlag;
}

я вижу, что иногда сообщения доставляются 5 раз, потому что messageSentFlag является ложным и становится истинным только в прослушивателе подтверждения.

  1. Пожалуйста, скажите мне, как удалить очереди? Поскольку у меня их 8000, я видел один метод в rabbitAdmin для удаления очереди, но для этого нужно имя очереди, а мои очереди - это просто случайная очередь (UUID)

пожалуйста, дайте мне свои мысли, как я могу улучшить его или есть ли обходной путь? Для моего приложения многопоточная среда обязательна.

Заранее спасибо.


person lesnar    schedule 14.08.2015    source источник
comment
Я не вижу здесь ничего, что позволяло бы обнаруживать сообщения, доставленные не по порядку. Все, что я вижу, это серьезная проблема с безопасностью потоков в объекте MessageList.   -  person user207421    schedule 14.08.2015
comment
@EJP спасибо за ваш ответ. Вы имеете в виду, что я должен сделать MessageList как Singelton вместо области прототипа?   -  person lesnar    schedule 14.08.2015
comment
@EJP: сообщения не по порядку, которые я вижу только в интерфейсе Rabbitmq.   -  person lesnar    schedule 14.08.2015


Ответы (1)


RabbitMQ гарантирует порядок сообщений только после того, как сообщение находится в определенной очереди.

Нет никаких гарантий порядка сообщений для отправки сообщений в RabbitMQ, если только вы не установите эти гарантии. Во многих случаях это сложно, если не невозможно, особенно в такой многопоточной среде, как ваша.

Если вам нужно гарантировать, что сообщения обрабатываются в определенном порядке, вам необходимо создать или использовать ресеквенсер

Общая идея заключается в том, что вам нужно нумеровать ваши сообщения в источнике — 1, 2, 3, 4, 5 и т. д. Когда ваши потребители извлекают сообщения из очереди, вы смотрите на номер сообщения и смотрите, является ли он исходным. тот, который вам нужен прямо сейчас. Если это не так, вы сохраните сообщение и обработаете его позже. Когда у вас есть сообщение #, которое вы ищете в данный момент, вы последовательно обработаете все сообщения, которые у вас есть в данный момент.

Spring должен иметь что-то вроде ресеквенсора, хотя я недостаточно знаком с этой экосистемой, чтобы указать вам правильное направление.

person Derick Bailey    schedule 14.08.2015