Я пишу приложение Java, которое отправляет и получает сообщения с сервера веб-сокетов. Когда приложение получает сообщение, его обработка может занять некоторое время. Поэтому я пытаюсь использовать несколько потоков для получения сообщений. Насколько я понимаю, Grizzly
имеет селекторные потоки, а также рабочие потоки. По умолчанию есть 1 поток выбора и 2 рабочих потока, в следующем примере я пытаюсь увеличить их до 5 и 10 соответственно. В приведенном ниже примере я приостанавливаю поток, который вызывает метод onMessage
, на 10 секунд, чтобы имитировать обработку входящей информации. Информация поступает каждую секунду, поэтому 10 потоков должны справиться с объемом трафика. Когда я профилирую запуск, работает только 1 поток селектора и 2 рабочих потока. Кроме того, сообщения принимаются только с интервалом в 10 секунд. Указание на то, что только 1 поток обрабатывает трафик - я нахожу это очень странным. Во время профилирования один рабочий поток, например. Grizzly(1)
получает первое отправленное сообщение. Затем через 10 секунд "Гризли(2)" получает второе сообщение - тогда Grizzly(2)
продолжает получать сообщения, а Grizzly(1)
не выполняет никаких действий.
Может кто-нибудь объяснить это странное поведение и как его изменить, например. 10 потоков, постоянно ожидающих сообщения в очереди?
Основной:
public static void main(String[] args) {
WebsocketTextClient client = new WebsocketTextClient();
client.connect();
for (int i = 0; i < 60; i++) {
client.send("Test message " + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
}
}
WebsocketTextClient.java:
import java.net.URI;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.Endpoint;
import javax.websocket.MessageHandler;
import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.client.ThreadPoolConfig;
import org.glassfish.tyrus.container.grizzly.client.GrizzlyClientProperties;
public class WebsocketTextClient {
private ClientManager client;
private ClientEndpointConfig clientConfig;
WebsocketTextClientEndpoint endpoint;
public WebsocketTextClient() {
client = ClientManager.createClient();
client.getProperties().put(GrizzlyClientProperties.SELECTOR_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(5));
client.getProperties().put(GrizzlyClientProperties.WORKER_THREAD_POOL_CONFIG, ThreadPoolConfig.defaultConfig().setMaxPoolSize(10));
}
public boolean connect() {
try {
clientConfig = ClientEndpointConfig.Builder.create().build();
endpoint = new WebsocketTextClientEndpoint();
client.connectToServer(endpoint, clientConfig, new URI("wss://echo.websocket.org"));
} catch (Exception e) {
return false;
}
return true;
}
public boolean disconnect() {
return false;
}
public boolean send(String message) {
endpoint.session.getAsyncRemote().sendText(message);
return true;
}
private class WebsocketTextClientEndpoint extends Endpoint {
Session session;
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("Connection opened");
this.session = session;
session.addMessageHandler(new WebsocketTextClientMessageHandler());
}
}
private class WebsocketTextClientMessageHandler implements MessageHandler.Whole<String> {
@Override
public void onMessage(String message) {
System.out.println("Message received from " + Thread.currentThread().getName() + " " + message);
try {
Thread.sleep(10000);
} catch (Exception e) {
System.out.println("Error sleeping!");
}
System.out.println("Resuming");
}
}
}
messageHandlers
допускает только один поток. - person sigvardsen   schedule 09.08.2015