У меня есть большой ConcurrentHashMap (cache.getCache()
), в котором я храню все свои данные (примерно 500+ МБ, но со временем он может увеличиваться). Это доступно для клиентов через API, реализованный с использованием простого java HttpServer. Вот упрощенный код:
JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);
Есть также некоторые фильтры, которые отправляют клиенты, поэтому они фактически не получают все данные каждый раз, но HashMap постоянно обновляется, поэтому клиентам приходится часто обновляться, чтобы иметь самые последние данные. Это неэффективно, поэтому я решил отправлять обновления данных клиентам в режиме реального времени с помощью WebSockets.
Я выбрал для этого Undertow, потому что я могу просто импортировать его из Maven и мне не нужно выполнять дополнительную настройку на сервере.
При подключении к WS я добавляю канал в HashSet и отправляю весь набор данных (клиент отправляет сообщение с некоторыми фильтрами перед получением исходных данных, но я удалил эту часть из примера):
public class MyConnectionCallback implements WebSocketConnectionCallback {
CacheContainer cache;
Set<WebSocketChannel> clients = new HashSet<>();
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public MyConnectionCallback(CacheContainer cache) {
this.cache = cache;
Thread pusherThread = new Thread(() -> {
while (true) {
push(queue.take());
}
});
pusherThread.start();
}
public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
clients.add(webSocketChannel);
WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
}
}
}
private void push(String message) {
Set<WebSocketChannel> closed = new HashSet<>();
clients.forEach((webSocketChannel) -> {
if (webSocketChannel.isOpen()) {
WebSockets.sendText(message, webSocketChannel, null);
} else {
closed.add(webSocketChannel);
}
}
closed.foreach(clients::remove);
}
public void putMessage(String message) {
queue.put(message);
}
}
После каждого изменения в моем кеше я получаю новое значение и помещаю его в очередь (я не сериализую объект myUpdate
напрямую, потому что в методе updateCache за этим стоит другая логика). За обновление кеша отвечает только один поток:
cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));
Проблемы, которые я вижу с этим подходом:
- при первоначальном подключении весь набор данных преобразуется в строку, и я боюсь, что слишком много запросов могут привести к тому, что сервер станет OOM. WebSockets.sendText принимает только String и ByteBuffer
- если я сначала добавлю канал в набор клиентов, а затем отправлю данные, push-уведомление может быть отправлено клиенту до того, как исходные данные будут отправлены, и клиент будет в недопустимом состоянии.
- если я сначала отправлю исходные данные, а затем добавлю канал в набор клиентов, push-сообщения, которые приходят во время отправки исходных данных, будут потеряны, а клиент будет в недопустимом состоянии.
Решение, которое я придумал для проблем № 2 и № 3, состоит в том, чтобы поместить сообщения в очередь (я бы преобразовал Set<WebSocketChannel>
в Map<WebSocketChannel,Queue<String>>
и отправил сообщения в очередь только после того, как клиент получит исходный набор данных, но я приветствую любые другие предложения здесь.
Что касается проблемы № 1, мой вопрос заключается в том, что будет наиболее эффективным способом отправки исходных данных через WebSocket? Например, что-то вроде записи с помощью JsonWriter непосредственно в WebSocket.
Я понимаю, что клиенты могут сделать первоначальный вызов с помощью API и подписаться на WebSocket для получения изменений, но этот подход возлагает на клиентов ответственность за правильное состояние (им нужно подписаться на WS, поставить в очередь сообщения WS, получить исходные данные с помощью API, а затем применить поставленные в очередь сообщения WS к их набору данных после получения исходных данных), и я не хочу оставлять контроль над этим им, потому что данные конфиденциальны.