Чтобы разрешить несколько итераций в результирующем потоке из CompletableFuture<Stream<String>>
, я рассматриваю один из следующих подходов:
Преобразуйте полученное будущее в
CompletableFuture<List<String>>
через:teams.thenApply(st -> st.collect(toList()))
Преобразуйте полученное будущее в
Flux<String>
с помощью cache:Flux.fromStream(teams::join).cache();
Flux<T>
- это реализация Publisher<T>
в проектном реакторе.
Пример использования:
Я хотел бы получить последовательность с названиями команд высшей лиги (например, Stream<String>
) из источника данных, который предоставляет объект League
с Standing[]
(на основе RESTful API футбольных данных, например http://api.football-data.org/v1/soccerseasons/445/leagueTable). Используя AsyncHttpClient
и Gson
, мы имеем:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
Чтобы повторно использовать полученный поток, у меня есть два варианта:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T>
менее подробный и предоставляет все, что мне нужно. Тем не менее, правильно ли его использовать в этом сценарии?
Или лучше использовать CompletableFuture<List<String>>
? Или есть другая лучшая альтернатива?
ОБНОВЛЕНО некоторыми мыслями (16.03.2018):
CompletableFuture<List<String>>
:
- [ПРОФИ]
List<String>
будет собираться в продолжении, и когда нам нужно будет перейти к результату будущего, возможно, он уже будет завершен. - [МИНУСЫ] Многословие деклараций.
- [МИНУСЫ] Если мы просто хотим использовать его один раз, нам не нужно собирать эти предметы в
List<T>
.
Flux<String>
:
- [PROS] Краткость декларации
- [ПЛЮСЫ] Если мы просто хотим использовать его один раз, мы можем опустить
.cache()
и перенаправить его на следующий уровень, который может воспользоваться реактивным API, например реактивный контроллер потока полотна, например@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
- [МИНУСЫ] Если мы хотим повторно использовать этот
Flux<T>
, мы должны заключить его в кешируемыйFlux<T>
(….cache()
), который, в свою очередь, добавит накладные расходы при первом обходе, потому что он должен сохранять полученные элементы во внутреннем кэше.
Flux
- это асинхронный реактивный конвейер.List
есть, ну аList
. Что вам нужно? Вы сравниваете яблоки с апельсинами. - person Boris the Spider   schedule 27.06.2018List
сFlux
. Я сравниваюCF<List>
сFlux
. - person Miguel Gamboa   schedule 27.06.2018Mono<List<T>>
, а неFlux<T>
. Должно быть очевидно, что они разные. - person Boris the Spider   schedule 27.06.2018Mono<List<T>>
совпадает сCF<List<T>>
. НИКАКИХ преимуществ при преобразовании изCF<List<T>>
вMono<List<T>>
. - person Miguel Gamboa   schedule 27.06.2018