Spring WebSocketMessageBrokerConfigurer с Artemis 2.6.3 Multicast (тема) не работает

Адрес Artemis mutilcast и очередь не работают должным образом. Моя идея - создать группы или обмен сообщениями для пользователя, который может иметь несколько сеансов веб-сокетов (веб, Android и т. Д.). Сервер опубликует уведомление на многоадресный адрес artemis, и все подписчики должны получить уведомление. В текущем сценарии я просто заставляю пользователя luislaves00 создавать более одного сеанса. В artemis я вижу 2 потребителей (не уверен, как Message Broker Relay из spring выполняет свою работу), но потребители ведут себя как циклический перебор, а не издатель-подписчик. С брокером в памяти от Spring он работает нормально, но недолговечен, поэтому, когда нет подключенного подписчика, сообщения удаляются. Вот код, который я использую:

Клиентская часть:

function connect() {
    var socket = new SockJS('/notification-websocket');
    stompClient = Stomp.over(socket);
    var headers = {
        // todo: server will handle this logic
        'client-id': 'luisalves00',
        'durable-subscription-name': 'luisalves00',
        'id' : 'luisalves00'
    };
    stompClient.connect(headers, function(frame) {
        setConnected(true);
        console.log('Connected: ' + frame);
        // todo: server will handle this logic
        stompClient.subscribe('/topic/notification/username' + 'luisalves00', function(notification) {
            showNotification(JSON.parse(notification.body).content);
        }, headers);
    });
}

Конфигурация реле брокера:

public void configureMessageBroker(MessageBrokerRegistry config) {
        // Artemis ->
        // tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true
        config.enableStompBrokerRelay("/topic").setRelayHost("127.0.0.1").setRelayPort(61613);
        config.setApplicationDestinationPrefixes("/app");
        //config.enableSimpleBroker("/topic");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        logger.info("Registering the stomp endpoints.");

        registry.addEndpoint("/notification-websocket").setAllowedOrigins("*").withSockJS();

    }

Производитель фиктивных уведомлений сервера:

@Scheduled(fixedDelay = 20000)
public void scheduleTaskWithFixedDelay() {
    final Notification message = new Notification(UUID.randomUUID().toString() + " -> "  + dateTimeFormatter.format(LocalDateTime.now()));
    try {
        final String user = "luisalves00";
        logger.info("Creating msg={}", message);
        final Map<String, Object> headers = new HashMap<>();
        headers.put("subscription-id", user);
        template.convertAndSend("/topic/notification/username/" + user, message, headers);
    } catch (Exception e) {
        logger.error("", e);
    }
}

Когда клиент подписывается, artemis создает адрес и постоянную очередь со следующими параметрами:

Addesses: 
id=2147496008 name=/topic/notification/group1/ routingType=[MULTICAST] queueCount=1

Queue
id=2147496011 
name=group1.group1 
address=/topic/notification/group1/ 
routingType=MULTICAST
durable=true
maxConsumers-1
purgeOnNoConsumers=false 
consumerCount=0

person la00    schedule 30.09.2018    source источник
comment
Когда вы говорите: «В artemis я вижу 2 потребителя» ... К какой очереди подключены эти 2 потребителя? Если они подключены к одной и той же очереди, они будут получать сообщения циклически (поскольку они будут совместно использовать сообщения, помещенные в очередь). Если вы хотите, чтобы оба потребителя получали сообщения, отправленные на адрес, на адресе должно быть 2 очереди по 1 потребителю в каждой. Мне кажется, что 2 ваших потребителя создают долговременную подписку с одним и тем же идентификатором клиента и именем подписки, что означает, что они будут совместно использовать очередь долговременной подписки.   -  person Justin Bertram    schedule 30.09.2018
comment
Спасибо, Джастин. Вчера я снова перечитал всю документацию, как я понял, с тем же идентификатором клиента и именем подписки. Каждому клиенту нужен уникальный идентификатор клиента, и я использовал luisalve00 для обоих. Мне нужно сделать что-то вроде luisallves00_1 и luisalves00_2, чтобы создать 2 очереди. Запутывающая часть - это routingType = MULTICAST в очереди. Он ведет себя как Q, а не как тема, поэтому я этого не понимаю. Тем не менее, с этой реализацией можно создавать группы потребителей для балансировки нагрузки, что кажется приятным.   -  person la00    schedule 01.10.2018
comment
Очереди, привязанные к адресу, могут быть произвольными или многоадресными. Каждый раз, когда сообщение отправляется на адрес, оно помещается в одну из очередей anycast (с использованием циклического перебора, если имеется ›1 очередь anycast) и / или все из многоадресные очереди. Для любой очереди может быть 1 или несколько потребителей. Потребители отправляют сообщение в очередь. Эта архитектура легко допускает широкое разнообразие семантики доставки.   -  person Justin Bertram    schedule 01.10.2018
comment
Да, немного похоже на кафку. В любом случае заставил его работать так, как я хотел, но поддержание состояния (слотов номеров) для идентификаторов клиентов кажется немного сложным. Побочный эффект заключается в том, что я могу контролировать количество подписчиков на тему в приложении. Я думаю, что spring делает что-то автоматическое с идентификатором сеанса websocket, но это создаст новую долговечную очередь для каждого случайного номера сеанса, что не кажется хорошей идеей, поскольку она будет сильно расти, и не знаю, как ее можно очистить. Кстати, какой заголовок я могу использовать для управления истечением срока действия сообщения? Думаю, это ttl, но не уверен.   -  person la00    schedule 02.10.2018
comment
Несомненно, существует опасность создать долговременную подписку, а затем забыть о ней, поскольку любое сообщение, отправленное на этот адрес, может накапливаться в подписке. Дополнительные сведения об удалении долгосрочных подписок см. На странице activemq.apache.org/artemis / docs / latest /. Используйте заголовок expires для управления истечением срока действия сообщения. В этом заголовке используется время Unix (en.wikipedia.org/wiki/Unix_time).   -  person Justin Bertram    schedule 02.10.2018


Ответы (1)


Чтобы работа была долговечной, необходимо использовать:

'client-id': '<some unique identifier for each client>'
'durable-subscription-name': 'tech-news'

В своей реализации я перестал использовать долговечность, так как идея уведомления должна быть доставлена ​​при создании (низкая задержка). Если потребитель не подключен, он может подключиться к исторической базе данных для получения старых сообщений, которые он не получил при подключении. Если вы действительно хотите сделать его долговечным, я предлагаю обрабатывать на стороне сервера «идентификатор клиента» и имя-подписки »для подключенного пользователя. Если у пользователя нет текущего сеанса, создайте устойчивую очередь. Следующий сеанс, который он создаст, должен быть недолговечным, так как они будут получать те же сообщения, что и длительный. Если первая сессия умирает, он все равно получает сообщения в других сессиях. Когда все умрут и он снова подключится, это снова будет первый сеанс, и он получит все сообщения, которые не были доставлены в постоянную очередь (возможно, некоторые из них он уже получил в недолговечной очереди), но у него будет история все сообщения (которые, как было сказано ранее, я думаю, следует обрабатывать другим способом).

person la00    schedule 24.10.2018