Потоки в javax.websockets/Tyrus

Я пишу приложение Java, которое отправляет и получает сообщения с сервера веб-сокетов. Когда приложение получает сообщение, его обработка может занять некоторое время. Поэтому я пытаюсь использовать несколько потоков для получения сообщений. Насколько я понимаю, Grizzly имеет селекторные потоки, а также рабочие потоки. По умолчанию есть 1 поток выбора и 2 рабочих потока, в следующем примере я пытаюсь увеличить их до 5 и 10 соответственно. В приведенном ниже примере я приостанавливаю поток, который вызывает метод onMessage, на 10 секунд, чтобы имитировать обработку входящей информации. Информация поступает каждую секунду, поэтому 10 потоков должны справиться с объемом трафика. Когда я профилирую запуск, работает только 1 поток селектора и 2 рабочих потока. Кроме того, сообщения принимаются только с интервалом в 10 секунд. Указание на то, что только 1 поток обрабатывает трафик - я нахожу это очень странным. Во время профилирования один рабочий поток, например. Grizzly(1) получает первое отправленное сообщение. Затем через 10 секунд "Гризли(2)" получает второе сообщение - тогда Grizzly(2) продолжает получать сообщения, а Grizzly(1) не выполняет никаких действий.

Может кто-нибудь объяснить это странное поведение и как его изменить, например. 10 потоков, постоянно ожидающих сообщения в очереди?

Основной:

    public static void main(String[] args) {
        WebsocketTextClient client = new WebsocketTextClient();
        client.connect();
        for (int i = 0; i < 60; i++) {
            client.send("Test message " + i);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
        }
    }

WebsocketTextClient.java:

import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;

public class WebsocketTextClient {

    private ClientManager client;
    private ClientEndpointConfig clientConfig;
    WebsocketTextClientEndpoint endpoint;

    public WebsocketTextClient() {
        client = ClientManager.createClient();
        client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
        client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
    }

    public boolean connect() {
        try {
            clientConfig = ClientEndpointConfig.Builder.create().build();
            endpoint = new WebsocketTextClientEndpoint();
            client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
        } catch (Exception e) {
            return false;
        }
        return true;
    }

    public boolean disconnect() {
        return false;
    }

    public boolean send(String message) {
        endpoint.session.getAsyncRemote().sendText(message);
        return true;
    }

    private class WebsocketTextClientEndpoint extends Endpoint {
        Session session;

        @Override
        public void onOpen(Session session, EndpointConfig config) {
            System.out.println("Connection opened");
            this.session = session;
            session.addMessageHandler(new WebsocketTextClientMessageHandler());
        }
    }

    private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {

        @Override
        public void onMessage(String message) {
            System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
            try {
                Thread.sleep(10000);
            } catch (Exception e) {
                System.out.println("Error sleeping!");
            }
            System.out.println("Resuming");
        }
    }
}

person sigvardsen    schedule 08.08.2015    source источник
comment
Это может быть макс. по сравнению с проблемой размера ядра см. этот вопрос. т.е. попробуйте использовать setCorePoolSize вместо maxPoolSize.   -  person vanOekel    schedule 09.08.2015
comment
Нет, я тоже пробовал. Однако теперь я узнал, что документация для messageHandlers допускает только один поток.   -  person sigvardsen    schedule 09.08.2015


Ответы (1)


Похоже, вы просите, чтобы WebSockets могли получать несколько сообщений, отправленных одним и тем же клиентским соединением, обрабатывать эти сообщения в отдельных потоках и отправлять ответы, когда они готовы, что означает, что они могут быть не в порядке. Этот сценарий может произойти только в том случае, если клиент является многопоточным.

Для работы с несколькими потоками в одном и том же сеансе WebSocket обычно требуется возможность для WebSockets мультиплексировать данные, поступающие к клиенту и от него. В настоящее время это не функция WebSockets, но, безусловно, может быть построена поверх нее. Однако мультиплексирование этих клиентских и серверных потоков на одном канале представляет изрядную сложность, поскольку вам необходимо предотвратить непреднамеренную перезапись всех клиентских и серверных потоков или их зависание друг от друга.

Спецификация Java для MessageHandler, возможно, немного двусмысленна в отношении модели потоков;

https://docs.oracle.com/javaee/7/api/javax/websocket/MessageHandler.html говорит:

Каждый сеанс веб-сокета использует не более одного потока за раз для вызова своих обработчиков сообщений.

Но важным термином здесь является "сокет сеанс". Если ваш клиент отправляет несколько сообщений в рамках одного и того же сеанса WebSocket, обработчик на стороне сервера будет выполняться в одном потоке. Это не означает, что вы не можете делать много интересных вещей внутри потока, особенно если вы используете Input/OutputStreams (или Writers) на обоих концах. Это действительно означает, что связь с клиентом осуществляется только одним потоком. Если вы хотите мультиплексировать связь, вам нужно будет написать что-то поверх сокета, чтобы сделать это; это будет включать разработку собственной модели потоковой передачи для отправки запросов.

Более простым решением было бы создание нового сеанса для каждого запроса клиента. Каждый клиентский запрос запускает сеанс (т. е. TCP-соединение), отправляет данные и ожидает результата. Это дает вам несколько потоков MessageHandler — по одному на сеанс в соответствии со спецификацией.

Это самый простой способ получить многопоточность на стороне сервера; любой другой подход, как правило, потребует механизма мультиплексирования, который, в зависимости от вашего варианта использования, возможно, не стоит усилий и, безусловно, сопряжен с некоторой сложностью и риском.

Если вас беспокоит количество сеансов (соединений TCP/HTTP) между клиентом/серверами и сервером/ами, вы можете рассмотреть возможность создания пула сеансов на стороне клиента и использовать каждый сеанс клиента по одному, возвращая сеанс в пул всякий раз, когда клиент закончит с ним.

Наконец, возможно, это не имеет прямого отношения: я обнаружил, что когда я использовал Payara Micro для обслуживания конечной точки WebSocket, мне нужно было установить это:

  <resources>
    ...
    <managed-executor-service maximum-pool-size="200" core-pool-size="10" long-running-tasks="true" keep-alive-seconds="300" hung-after-seconds="300" task-queue-capacity="20000" jndi-name="concurrent/__defaultManagedExecutorService" object-type="system-all"></managed-executor-service>

Служба ManagedExecutorService по умолчанию предоставляет только один поток. Похоже, это относится и к Glassfish. Это заставило меня часами бегать, думая, что я не понимаю модель многопоточности, когда меня смущал только размер пула.

person Doctor Eval    schedule 15.08.2016