Я работаю над приложением на основе событий в реальном времени с использованием Spring WebSockets, Messaging и RabbitMQ. В этом приложении сообщения должны доставляться клиентам в точном порядке, в котором они были вставлены в RabbitMQ.
"РЕДАКТИРОВАТЬ"
Наша цель - получить сообщение из браузера, обработать его на сервере по порядку (по отношению к уникальному объекту, определяемому параметром маршрута), обогатить сообщение и передать его всем подписывающимся браузерам через внешний STOMP MQ (RabbitMQ).
Наш метод MessageMapping выглядит следующим образом:
@MessageMapping (/ commands. {Route}. {Data}) public CommandMessage receiveCommand (сообщение CommandMessage, участник) {
try {
// Get object to synch on using route
Object o = ...
syncrhonized(o) {
// Perform command on object
// Set message server sequence
message.setServerSequence(o.getAutoIncrementSequence());
// Log server sequence
log.debug("Message server sequence:" + message.getServerSequence());
// Send to external MQ for broadcasting to all subscribers
return message;
}
} catch (Exception e) {
...
}
return null;
}
Если мы настроим ClientInboundChannel и ClientOutboundChannels с corePoolSize и maxPoolSize равными 1 каждому, все сообщения будут в порядке.
Если мы увеличим corePoolSize и maxPoolSize в ClientInboundChannel, сообщения попадут в MQ в неправильном порядке; если мы сделаем такое же увеличение для ClientOutboundChannel, сообщения попадут в браузер в неправильном порядке.
]
Тесты проводились с использованием одного браузера-клиента.
Мы включили трассировку для StompBrokerRelayMessageHandler и получили в журналах такие записи, как:
2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Обработка сообщения = [Байт полезной нагрузки [303]] [Заголовки = {StompCommand = SEND, nativeHeaders = {content-type [application / json; charset = UTF-8], destination = [/ topic / commands.BKN01.20140318]}, simpMessageType = MESSAGE, simpDestination = / topic / commands.BKN01.20140318, contentType = application / json; charset = UTF -8, simpSessionId = ehjcoxb3, id = a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, timestamp = 1399400799940}] 06.05.2014, 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBomagePelwardMessage_Message_Channel: [clientInboundChannel] сообщение брокеру 06.05.2014, 14:26 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Игнорирование сообщения к месту назначения = / app / commands.BKN01.20140318 06.05.2014 14:26: 39 TRACE [clientInboundChannel-7] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler. java: 406] Игнорирование сообщения для пункта назначения = / app / commands.BKN01.20140318 06.05.2014 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Обработка сообщения = [байт полезной нагрузки [байт полезной нагрузки] ]] [Заголовки = {StompCommand = SEND, nativeHeaders = {content-type = [application / json; charset = UTF-8], destination = [/ topic / commands.BKN01.20140318]}, simpMessageType = MESSAGE, simpDestination = / topic / commands.BKN01.20140318, contentType = application / json; charset = UTF-8, simpSessionId = ehjcoxb3, id = 3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd, timestamp = 139940079994726: 2014] 39 DEBUG [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Пересылка сообщения брокеру
Мы также включили трассировку для StompSubProtocolHandler в пакете org.springframework.web.socket.messaging и получили такие сообщения, как:
2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] o.s.w.s.m.StompSubProtocolHandler [StompSubProtocolHandler.java:180] Получено сообщение от клиентского сеанса = u8wrnsr6
Никакая информация не предоставляет простой способ сопоставить наше свойство message.serverSequence (которое устанавливается перед отправкой в MQ) с различными идентификаторами деталей журнала.
Есть ли способ увеличить потоки входящих / исходящих каналов, чтобы порядок оставался неизменным? Например, могут ли каналы быть привязаны к «маршруту» или поток может быть привязан к «маршруту»?
Пожалуйста помоги.
Спасибо,
Дэн