Spring WebSocket - последовательная доставка сообщений на clientOutboundChannel (на очередь)

Я работаю над приложением на основе событий в реальном времени с использованием Spring WebSockets, Messaging и RabbitMQ. В этом приложении сообщения должны доставляться клиентам в точном порядке, в котором они были вставлены в RabbitMQ.

"РЕДАКТИРОВАТЬ"

Наша цель - получить сообщение из браузера, обработать его на сервере по порядку (по отношению к уникальному объекту, определяемому параметром маршрута), обогатить сообщение и передать его всем подписывающимся браузерам через внешний STOMP MQ (RabbitMQ).

Наш метод MessageMapping выглядит следующим образом:

@MessageMapping (/ commands. {Route}. {Data}) public CommandMessage receiveCommand (сообщение CommandMessage, участник) {

try {

  // Get object to synch on using route
  Object o = ...

  syncrhonized(o) {

    // Perform command on object

    // Set message server sequence
    message.setServerSequence(o.getAutoIncrementSequence());

    // Log server sequence
    log.debug("Message server sequence:" + message.getServerSequence());

    // Send to external MQ for broadcasting to all subscribers
    return message;
  }

} catch (Exception e) {
  ...
}

return null;

}

Если мы настроим ClientInboundChannel и ClientOutboundChannels с corePoolSize и maxPoolSize равными 1 каждому, все сообщения будут в порядке.

Если мы увеличим corePoolSize и maxPoolSize в ClientInboundChannel, сообщения попадут в MQ в неправильном порядке; если мы сделаем такое же увеличение для ClientOutboundChannel, сообщения попадут в браузер в неправильном порядке.

]

Тесты проводились с использованием одного браузера-клиента.

Мы включили трассировку для StompBrokerRelayMessageHandler и получили в журналах такие записи, как:

2014-05-06 14:26:39 TRACE [clientInboundChannel-6] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Обработка сообщения = [Байт полезной нагрузки [303]] [Заголовки = {StompCommand = SEND, nativeHeaders = {content-type [application / json; charset = UTF-8], destination = [/ topic / commands.BKN01.20140318]}, simpMessageType = MESSAGE, simpDestination = / topic / commands.BKN01.20140318, contentType = application / json; charset = UTF -8, simpSessionId = ehjcoxb3, id = a31d0e3d-12cc-f562-1ec2-e2d7ba0899eb, timestamp = 1399400799940}] 06.05.2014, 14:26:39 DEBUG [clientInboundChannel-6] osmssStompBomagePelwardMessage_Message_Channel: [clientInboundChannel] сообщение брокеру 06.05.2014, 14:26 TRACE [clientInboundChannel-3] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:406] Игнорирование сообщения к месту назначения = / app / commands.BKN01.20140318 06.05.2014 14:26: 39 TRACE [clientInboundChannel-7] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler. java: 406] Игнорирование сообщения для пункта назначения = / app / commands.BKN01.20140318 06.05.2014 14:26:39 TRACE [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:412] Обработка сообщения = [байт полезной нагрузки [байт полезной нагрузки] ]] [Заголовки = {StompCommand = SEND, nativeHeaders = {content-type = [application / json; charset = UTF-8], destination = [/ topic / commands.BKN01.20140318]}, simpMessageType = MESSAGE, simpDestination = / topic / commands.BKN01.20140318, contentType = application / json; charset = UTF-8, simpSessionId = ehjcoxb3, id = 3cc7b4ae-8ea4-ef8a-6c4d-c3bc1ed23bcd, timestamp = 139940079994726: 2014] 39 DEBUG [clientInboundChannel-1] osmssStompBrokerRelayMessageHandler [StompBrokerRelayMessageHandler.java:658] Пересылка сообщения брокеру

Мы также включили трассировку для StompSubProtocolHandler в пакете org.springframework.web.socket.messaging и получили такие сообщения, как:

2014-05-07 10:58:58 TRACE [http-nio-8080-exec-5] o.s.w.s.m.StompSubProtocolHandler [StompSubProtocolHandler.java:180] Получено сообщение от клиентского сеанса = u8wrnsr6

Никакая информация не предоставляет простой способ сопоставить наше свойство message.serverSequence (которое устанавливается перед отправкой в ​​MQ) с различными идентификаторами деталей журнала.

Есть ли способ увеличить потоки входящих / исходящих каналов, чтобы порядок оставался неизменным? Например, могут ли каналы быть привязаны к «маршруту» или поток может быть привязан к «маршруту»?

Пожалуйста помоги.

Спасибо,

Дэн


person user3549633    schedule 18.04.2014    source источник


Ответы (1)


[ИЗМЕНИТЬ]

Спасибо, теперь понятно. Действительно, в настоящее время нет способа гарантировать, что сообщения от внешнего брокера доставляются клиенту в одном и том же порядке и что сообщения от клиента будут отправляться внешнему брокеру в том же порядке.

Мы могли бы назначить индекс каждому сообщению в сеансе, а затем проверить и выполнить буферизацию, если необходимо, чтобы обеспечить соблюдение порядка, но это будет новая функция. Не стесняйтесь создавать заявку в JIRA.

На данный момент вы можете изучить создание ChannelInterceptor, который проверяет каждое сообщение, чтобы узнать, к какому сеансу оно принадлежит, а затем установить для него некоторый заголовок инкрементного индекса перед его отправкой (в clientInboundChannel или clientOutboundChannel). Затем расширьте StompSubProtocolHandler и StompBrokerRelayMessageHandler, чтобы проверить заголовок индекса и попытаться установить порядок сообщений.

person Rossen Stoyanchev    schedule 06.05.2014
comment
Я обновил вопрос кодом и результатами трассировки. - person user3549633; 07.05.2014
comment
Спасибо, что ответили так быстро! Я добавлю новый запрос функции в JIRA и начну работать над вашими идеями обхода. - person user3549633; 08.05.2014
comment
@Rossen Stoyanchev: Как я понял из этого сообщения, только размер пула ThreadPoolExecutor, установленный на 1, гарантирует, что сообщения будут получены клиентом в том же порядке, что и в очереди брокера. Это верно? Если да, знаете ли вы, как один поток будет работать для многих клиентов и многих очередей? Доступны ли для этого тесты производительности? Я работаю над приложением, в котором важен порядок или сообщения, и оно сильно загружено. - person JohnDoDo; 02.07.2014
comment
Пул потоков из 1 для записи клиентам создает узкое место. Даже если бы все клиенты работали быстро, для оптимальной производительности вам все равно нужен был бы размер пула потоков, равный количеству процессоров. Однако для защиты от медленных потребителей, скорее всего, потребуется больший размер пула потоков (в противном случае даже один из них исчерпал бы ваш пул потоков). Кстати, создание заявки в JIRA, чтобы запросить эту функцию вместе с некоторыми деталями, является хорошим первым шагом и способом гарантировать, что это станет функцией. - person Rossen Stoyanchev; 03.07.2014