Эффективно отправляйте большие наборы данных с помощью Undertow WebSockets

У меня есть большой ConcurrentHashMap (cache.getCache()), в котором я храню все свои данные (примерно 500+ МБ, но со временем он может увеличиваться). Это доступно для клиентов через API, реализованный с использованием простого java HttpServer. Вот упрощенный код:

JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);

Есть также некоторые фильтры, которые отправляют клиенты, поэтому они фактически не получают все данные каждый раз, но HashMap постоянно обновляется, поэтому клиентам приходится часто обновляться, чтобы иметь самые последние данные. Это неэффективно, поэтому я решил отправлять обновления данных клиентам в режиме реального времени с помощью WebSockets.

Я выбрал для этого Undertow, потому что я могу просто импортировать его из Maven и мне не нужно выполнять дополнительную настройку на сервере.

При подключении к WS я добавляю канал в HashSet и отправляю весь набор данных (клиент отправляет сообщение с некоторыми фильтрами перед получением исходных данных, но я удалил эту часть из примера):

public class MyConnectionCallback implements WebSocketConnectionCallback {
  CacheContainer cache;
  Set<WebSocketChannel> clients = new HashSet<>();
  BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public MyConnectionCallback(CacheContainer cache) {
    this.cache = cache;
    Thread pusherThread = new Thread(() -> {
      while (true) {
        push(queue.take());
      }
    });
    pusherThread.start();
  }

  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
        clients.add(webSocketChannel);
        WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
      }
    }
  }

  private void push(String message) {
    Set<WebSocketChannel> closed = new HashSet<>();
    clients.forEach((webSocketChannel) -> {
        if (webSocketChannel.isOpen()) {
            WebSockets.sendText(message, webSocketChannel, null);
        } else {
            closed.add(webSocketChannel);
        }
    }
    closed.foreach(clients::remove);
  }

  public void putMessage(String message) {
    queue.put(message);
  }
}

После каждого изменения в моем кеше я получаю новое значение и помещаю его в очередь (я не сериализую объект myUpdate напрямую, потому что в методе updateCache за этим стоит другая логика). За обновление кеша отвечает только один поток:

cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));

Проблемы, которые я вижу с этим подходом:

  1. при первоначальном подключении весь набор данных преобразуется в строку, и я боюсь, что слишком много запросов могут привести к тому, что сервер станет OOM. WebSockets.sendText принимает только String и ByteBuffer
  2. если я сначала добавлю канал в набор клиентов, а затем отправлю данные, push-уведомление может быть отправлено клиенту до того, как исходные данные будут отправлены, и клиент будет в недопустимом состоянии.
  3. если я сначала отправлю исходные данные, а затем добавлю канал в набор клиентов, push-сообщения, которые приходят во время отправки исходных данных, будут потеряны, а клиент будет в недопустимом состоянии.

Решение, которое я придумал для проблем № 2 и № 3, состоит в том, чтобы поместить сообщения в очередь (я бы преобразовал Set<WebSocketChannel> в Map<WebSocketChannel,Queue<String>> и отправил сообщения в очередь только после того, как клиент получит исходный набор данных, но я приветствую любые другие предложения здесь.

Что касается проблемы № 1, мой вопрос заключается в том, что будет наиболее эффективным способом отправки исходных данных через WebSocket? Например, что-то вроде записи с помощью JsonWriter непосредственно в WebSocket.

Я понимаю, что клиенты могут сделать первоначальный вызов с помощью API и подписаться на WebSocket для получения изменений, но этот подход возлагает на клиентов ответственность за правильное состояние (им нужно подписаться на WS, поставить в очередь сообщения WS, получить исходные данные с помощью API, а затем применить поставленные в очередь сообщения WS к их набору данных после получения исходных данных), и я не хочу оставлять контроль над этим им, потому что данные конфиденциальны.


person HomeIsWhereThePcIs    schedule 06.11.2019    source источник


Ответы (2)


Кажется, проблема № 2 и № 3 связана с тем, что разные потоки могут одновременно отправлять состояние данных клиенту. Таким образом, в дополнение к вашему подходу вы можете рассмотреть два других подхода к синхронизации.

  1. используйте мьютекс для защиты доступа к данным и отправке клиентом. Это сериализует чтение и отправку данных клиентам, поэтому (псевдо) код становится таким:
protected void onFullTextMessage(...) {
   LOCK {
     clients.add(webSocketChannel);
     WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
   }
}

void push(String message) {
    Set<WebSocketChannel> closed = new HashSet<>();
    LOCK {
      clients.forEach((webSocketChannel) -> {
          if (webSocketChannel.isOpen()) {
              WebSockets.sendText(message, webSocketChannel, null);
          } else {
              closed.add(webSocketChannel);
          }
      }
    }
    closed.foreach(clients::remove);
}
  1. создать новый поток класса и службы, который несет исключительную ответственность за управление изменениями в кэше данных и передачу этих изменений клиентам; он будет использовать внутреннюю синхронизированную очередь для асинхронной обработки вызовов методов, а также отслеживать подключенных клиентов, он будет иметь такой интерфейс:
public void update_cache(....);
public void add_new_client(WebSocketChannel);

... каждый из этих вызовов вызывает операцию, которая должна быть завершена во внутреннем потоке объекта. Это гарантирует упорядоченность начального снимка и обновлений, поскольку только один поток выполняет работу по изменению кеша и распространению этих изменений среди подписчиков.

Что касается № 1, если вы использовали подход № 2, вы можете кэшировать сериализованное состояние ваших данных, что позволяет повторно использовать их в более поздних моментальных снимках (при условии, что они не были изменены за это время). Как отмечено в комментарии: это будет работать только в том случае, если у более поздних клиентов будет такая же конфигурация фильтра.

person Darren Smith    schedule 06.11.2019
comment
Есть только один поток, который может отправлять сообщения, но для простоты я не включил эту часть в исходный вопрос. Я добавил это сейчас. Кэширование сериализованного состояния моих данных не будет работать, потому что клиенты не получают все данные. Сначала он фильтруется пользовательскими сериализаторами Gson, это отличается для каждого клиента и основано на некоторых параметрах, которые отправляет клиент. - person HomeIsWhereThePcIs; 06.11.2019
comment
хорошее замечание по поводу фильтра. Я уточнил свой ответ; на самом деле служебный поток, который я имел в виду, будет отвечать как за изменение кеша, так и за отправку сообщений подписчикам. - person Darren Smith; 06.11.2019

Чтобы решить проблемы № 2 и № 3, я устанавливаю флаг принудительной блокировки на каждом клиенте, который разблокируется только при отправке исходных данных. Когда установлена ​​принудительная блокировка, поступающие сообщения помещаются в эту очередь клиентов. Затем сообщения в очереди отправляются перед любыми новыми сообщениями.

Я смягчил проблему № 1, используя ByteBuffer напрямую вместо String. Таким образом, я могу сэкономить немного памяти из-за кодировки (String по умолчанию использует UTF-16)

Окончательный код:

public class WebSocketClient {
  private boolean pushLock;
  private Gson gson;
  private Queue<CacheContainer> queue = new ConcurrentLinkedQueue<>();

  WebSocketClient(MyQuery query, CacheHandler cacheHandler) {
    pushLock = true;
    this.gson = GsonFactory.getGson(query, cacheHandler);
  }

  public synchronized boolean isPushLock() {
    return pushLock;
  }

  public synchronized void pushUnlock() {
    pushLock = false;
  }

  public Gson getGson() {
    return gson;
  }

  public Queue<CacheContainer> getQueue() {
    return queue;
  }

  public boolean hasBackLog() {
    return !queue.isEmpty();
  }
}

public class MyConnectionCallback implements WebSocketConnectionCallback {

  private final Map<WebSocketChannel, WebSocketClient> clients = new ConcurrentHashMap<>();
  private final BlockingQueue<CacheContainer> messageQueue = new LinkedBlockingQueue<>();

  private final Gson queryGson = new GsonBuilder().disableHtmlEscaping().create();

  private final CacheHandler cacheHandler;

  MyConnectionCallback(CacheHandler cacheHandler) {
    this.cacheHandler = cacheHandler;
    Thread pusherThread = new Thread(() -> {
      boolean hasPushLock = false;
      while (true) {
        if (messageQueue.isEmpty() && hasPushLock) hasPushLock = pushToAllClients(null);
        else hasPushLock = pushToAllClients(messageQueue.take());
      }
    }, "PusherThread");
    pusherThread.start();
  }

  @Override
  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) {
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() {
      @Override
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException {
        MyQuery query = new MyQuery(queryGson.fromJson(message.getData(), QueryJson.class));
        WebSocketClient clientConfig = new WebSocketClient(query, cacheHandler);
        clients.put(webSocketChannel, clientConfig);
        push(webSocketChannel, clientConfig.getGson(), cacheHandler.getCache());
        clientConfig.pushUnlock();
        }
    });
    webSocketChannel.resumeReceives();
  }

  void putMessage(CacheContainer message) {
    messageQueue.put(message);
  }

  private synchronized void push(WebSocketChannel webSocketChannel, Gson gson, CacheContainer message) throws IOException {
    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
      JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8))) {
      gson.toJson(message, CacheContainer.class, jsonWriter);
      jsonWriter.flush();
      if (baos.size() > 2) {
        WebSockets.sendText(ByteBuffer.wrap(baos.toByteArray()), webSocketChannel, null);
      }
    }
  }

  private synchronized boolean pushToAllClients(CacheContainer message) {
    AtomicBoolean hadPushLock = new AtomicBoolean(false);
    Set<WebSocketChannel> closed = new HashSet<>();

    clients.forEach((webSocketChannel, clientConfig) -> {
      if (webSocketChannel.isOpen()) {
        if (clientConfig.isPushLock()) {
          hadPushLock.set(true);
          clientConfig.getQueue().add(message);
        } else {
          try {
            if (clientConfig.hasBackLog())
              pushBackLog(webSocketChannel, clientConfig);
            if (message != null)
              push(webSocketChannel, clientConfig.getGson(), message);
          } catch (Exception e) {
            closeChannel(webSocketChannel, closed);
          }
        }
      } else {
        closed.add(webSocketChannel);
      }
    });

    closed.forEach(clients::remove);
    return hadPushLock.get();
  }

  private void pushBackLog(WebSocketChannel webSocketChannel, WebSocketClient clientConfig) throws IOException {
    while (clientConfig.hasBackLog()) {
      push(webSocketChannel, clientConfig.getGson(), clientConfig.getQueue().poll());
    }
  }

  private void closeChannel(WebSocketChannel channel, Set<WebSocketChannel> closed) {
    closed.add(channel);
    channel.close();
  }
}
person HomeIsWhereThePcIs    schedule 11.11.2019