пожалуйста, сначала посмотрите на мой код.
Это мой тестовый класс, который создает 2000 потоков, и эти потоки отправляют сообщения.
public class MessageSenderMultipleThreadMock {
@Autowired
MessageList message;
@Autowired
MessageSender sender;
public boolean process() throws InterruptedException {
for (int i = 0; i < 2000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String routingkey = "operation"
+ UUID.randomUUID().toString();
String queueName = UUID.randomUUID().toString();
message.setSender(Thread.currentThread().getName());
try {
sender.sendMessage(routingkey, queueName,
"this is message");
} catch (InvalidMessagingParameters e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
}
Thread.currentThread();
Thread.sleep(10000);
return true;
}
}
Отправитель сообщения
это мой основной класс отправителя сообщений
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageList message;
String queueName = "";
String routingKey = "";
@Autowired
private QueueCreationService service;
private boolean messageSentFlag;
String returnedMessage = "";
private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());
public boolean sendMessage(String routingKey, String queueName,
String messageToBeSent) throws InvalidMessagingParameters {
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
else {
this.routingKey = routingKey;
this.queueName = queueName;
}
service.processBinding(queueName, routingKey);
message.addMessages(messageToBeSent);
return execute();
}
/*
* overloaded sendMessage method will use requestMap . RequestMap includes
* queueName and routingKey that controller provides.
*/
public boolean sendMessage(Map<String, String> requestMap)
throws MessagingConnectionFailsException,
InvalidMessagingParameters {
this.queueName = requestMap.get("queue");
this.routingKey = requestMap.get("routingkey");
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
service.processBinding(queueName, routingKey);
preparingMessagingTemplate();
return execute();
}
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
private String convertMessageToJson(MessageList message) {
ObjectWriter ow = new ObjectMapper().writer()
.withDefaultPrettyPrinter();
String json = "";
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return json;
}
private void executeMessageSending() {
rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
convertMessageToJson(message), new CorrelationData(UUID
.randomUUID().toString()));
}
private void preparingMessagingTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
returnedMessage = replyText;
}
});
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.println("*" + ack);
if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
messageSentFlag = ack;
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has been successfully delivered");
} else {
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has not been delivered");
}
}
});
}
}
Мой класс конфигурации, который используется для обмена сообщениями
@Configuration
@ComponentScan("com.alpharaid.orange.*")
@PropertySource("classpath:application.properties")
public class MessageConfiguration {
String content = "";
@Value("${rabbitmq_host}")
String host = "";
String port = "";
@Value("${rabbitmq_username}")
String userName = "";
@Value("${rabbitmq_password}")
String password = "";
String queueName = "";
InputStream input = null;
@Autowired
public MessageConfiguration() {
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
@Scope("prototype")
public QueueCreationService service() {
return new QueueCreationService();
}
@Bean
@Scope("prototype")
public RabbitAdmin admin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
this.host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}
Мои проблемы:
Как я вижу на сервере, некоторые потоки успешно доставляют сообщения, а другие нет.
нет полной уверенности в прослушивателе rabbitTemplate (
rabbitTemplate.setReturnCallback (новый ReturnCallback () {
мне нужно, чтобы слушатель работал каждый раз, потому что на этом основании я попытаюсь отправить сообщение снова
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
я вижу, что иногда сообщения доставляются 5 раз, потому что messageSentFlag является ложным и становится истинным только в прослушивателе подтверждения.
- Пожалуйста, скажите мне, как удалить очереди? Поскольку у меня их 8000, я видел один метод в rabbitAdmin для удаления очереди, но для этого нужно имя очереди, а мои очереди - это просто случайная очередь (UUID)
пожалуйста, дайте мне свои мысли, как я могу улучшить его или есть ли обходной путь? Для моего приложения многопоточная среда обязательна.
Заранее спасибо.
MessageList
. - person user207421   schedule 14.08.2015