Реактивные потоки: Spring WebFlux - подпишитесь на существующего издателя

В настоящее время я переношу нашу существующую асинхронную REST-архитектуру Spring на новую библиотеку WebFlux Spring, и у меня есть вопрос по объединению нескольких запросов, чтобы они могли прослушивать один и тот же опубликованный ответ.

Пример использования выглядит следующим образом:

  1. Клиент A подключается к нашему веб-серверу и запрашивает данные
  2. Мы обращаемся к нашему кешу, чтобы проверить, есть ли там данные
  3. Мы этого не делаем, поэтому мы идем и получаем эти данные (клиент A подписался и ждет ответа)
  4. Клиент B подключается к нашему веб-серверу и запрашивает те же данные (обращается к той же конечной точке)
  5. Проверяем кеш, данных все равно нет
  6. Поскольку мы уже получаем эти данные для клиента A, мы не хотим делать еще один запрос, однако мы также не хотим отказываться от клиента B. Клиент B должен иметь возможность прослушивать ту же информацию

Как клиент B может подписаться на тот же поток ответов, которого ожидает клиент A?


person wild_nothing    schedule 11.04.2018    source источник
comment
Не могли бы вы дать фрагменты кода, чтобы показать, как выглядит API библиотеки кеширования? Кроме того, каково здесь ожидаемое поведение? Я предполагаю, что кеш может не содержать необработанный ответ для отправки клиенту, поэтому речь идет о совместном использовании кешированных данных на уровне службы, а не обязательно на веб-уровне?   -  person Brian Clozel    schedule 13.04.2018
comment
Мне просто интересно, возможно ли это с реактивными паттернами. Кэш используется в примере, чтобы показать, почему запрос может не получить немедленный ответ. Мне интересно, если два запроса попадают в одну и ту же конечную точку, возможно ли с помощью WebFlux присоединиться к ним и позволить им обоим ждать одного и того же ответа? Очевидно, этого нельзя сделать с классическими REST и Spring, но я надеялся, что подобное поведение возможно с реактивными потоками.   -  person wild_nothing    schedule 14.04.2018


Ответы (1)


«Клиент A подписался и ожидает ответа» Я полагаю, что запрос закодирован как Mono, и клиент A буквально подписывается на него:

Subscriber<Response> clientA = ... Mono<Response> request = makeRequest(...); request.subscribe(clientA);

тогда clientB должен подписаться таким же образом:

Subscriber<Response> clientB = ... request.subscribe(clientB);

Причем в кэше должны содержаться не ранее сохраненные данные ответа, а сами запросы типа Mono<Response>. Затем, если такой запрос находится в кеше, новые клиенты просто подписываются на него, независимо от того, был ли этот запрос уже выполнен или нет.

person Alexei Kaigorodov    schedule 15.04.2018
comment
Кэш в этом примере - это просто очень легкая структура данных Hazelcast IMap. Он заполняется данными из внешнего источника тогда и только тогда, когда упомянутых данных там еще нет. Я пытаюсь сделать так, чтобы вызов внешних данных (и заполнение кеша) не выполнялся дважды самым элегантным образом. Есть множество способов решить эту проблему, но я хотел бы знать, смогу ли я решить ее простым способом с помощью WebFlux - моя идея состоит в том, чтобы два запроса прослушивали одну и ту же последовательность событий. Я не могу удалить существующий IMap, но вы думаете, что мне следует добавить новый, содержащий Mono ‹R›? - person wild_nothing; 16.04.2018
comment
Так же, как процессоры имеют несколько уровней кеширования, вы можете добавить еще один уровень кеша для текущих запросов, содержащих Mono ‹R›, не касаясь IMap. Это может быть простой HashMap. - person Alexei Kaigorodov; 16.04.2018
comment
Интересная идея. Спасибо. - person wild_nothing; 16.04.2018
comment
Вы можете просто переместить полученный ответ из кеша для Mono в основной кеш: request.subscribe(response)->{ imapCache.put(response); responseCache.remove(request); } - person Alexei Kaigorodov; 17.04.2018