Правильно ли преобразовывать CompletableFuture ‹Stream ‹T›› в Publisher ‹T›?

Чтобы разрешить несколько итераций в результирующем потоке из CompletableFuture<Stream<String>>, я рассматриваю один из следующих подходов:

  1. Преобразуйте полученное будущее в CompletableFuture<List<String>> через: teams.thenApply(st -> st.collect(toList()))

  2. Преобразуйте полученное будущее в Flux<String> с помощью cache: Flux.fromStream(teams::join).cache();

Flux<T> - это реализация Publisher<T> в проектном реакторе.

Пример использования:

Я хотел бы получить последовательность с названиями команд высшей лиги (например, Stream<String>) из источника данных, который предоставляет объект League с Standing[] (на основе RESTful API футбольных данных, например http://api.football-data.org/v1/soccerseasons/445/leagueTable). Используя AsyncHttpClient и Gson, мы имеем:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));

Чтобы повторно использовать полученный поток, у меня есть два варианта:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()

Flux<T> менее подробный и предоставляет все, что мне нужно. Тем не менее, правильно ли его использовать в этом сценарии?

Или лучше использовать CompletableFuture<List<String>>? Или есть другая лучшая альтернатива?

ОБНОВЛЕНО некоторыми мыслями (16.03.2018):

CompletableFuture<List<String>>:

  • [ПРОФИ] List<String> будет собираться в продолжении, и когда нам нужно будет перейти к результату будущего, возможно, он уже будет завершен.
  • [МИНУСЫ] Многословие деклараций.
  • [МИНУСЫ] Если мы просто хотим использовать его один раз, нам не нужно собирать эти предметы в List<T>.

Flux<String>:

  • [PROS] Краткость декларации
  • [ПЛЮСЫ] Если мы просто хотим использовать его один раз, мы можем опустить .cache() и перенаправить его на следующий уровень, который может воспользоваться реактивным API, например реактивный контроллер потока полотна, например @GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
  • [МИНУСЫ] Если мы хотим повторно использовать этот Flux<T>, мы должны заключить его в кешируемый Flux<T> (….cache()), который, в свою очередь, добавит накладные расходы при первом обходе, потому что он должен сохранять полученные элементы во внутреннем кэше.

person Miguel Gamboa    schedule 09.03.2018    source источник
comment
что, в свою очередь, добавит накладных расходов при первом обходе - незначительно, игнорируйте это.   -  person Boris the Spider    schedule 27.06.2018
comment
Flux - это асинхронный реактивный конвейер. List есть, ну а List. Что вам нужно? Вы сравниваете яблоки с апельсинами.   -  person Boris the Spider    schedule 27.06.2018
comment
@BoristheSpider Я не сравниваю List с Flux. Я сравниваю CF<List> с Flux.   -  person Miguel Gamboa    schedule 27.06.2018
comment
Это Mono<List<T>>, а не Flux<T>. Должно быть очевидно, что они разные.   -  person Boris the Spider    schedule 27.06.2018
comment
Mono<List<T>> совпадает с CF<List<T>>. НИКАКИХ преимуществ при преобразовании из CF<List<T>> в Mono<List<T>>.   -  person Miguel Gamboa    schedule 27.06.2018


Ответы (2)


    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
EDIT:

Flux.fromStream(teams::join) - это запах кода, потому что он удерживает поток для получения результата от CompletableFuture, который выполняется в другом потоке.

person youhans    schedule 27.06.2018
comment
Я думаю, что он не удерживает поток, который вызывает fromStream(...). Это лениво. Мы просто передаем дескриптор метода join вместо того, чтобы вызывать его. Сразу после подписки, возможно, проталкивающий поток будет удерживаться join(), но только если источник CompletableFuture еще не завершен. - person Miguel Gamboa; 27.06.2018

После того, как вы загрузили таблицу лиги и названия команд извлечены из этой таблицы, я не уверен, что вам нужен поток, готовый к обратному давлению, для итерации по этим элементам. Преобразование потока в стандартный список (или массив) должно быть достаточно хорошим и, вероятно, должно иметь лучшую производительность, не так ли?

Например:

String[] teamNames = teams.join().toArray(String[]::new);
person Stéphane Appercel    schedule 12.03.2018
comment
Я не уверен в накладных расходах. Сбор предметов в List также вызывает дополнительные накладные расходы, не так ли? Если мы не всегда повторно используем результирующий teams, то создание холодного потока из Supplier::stream, возможно, будет более эффективным, поскольку мы не создаем вспомогательный объект List. С другой стороны, если мы уверены, что хотим повторно использовать teams, тогда, возможно, лучше собрать его в List, чем использовать cache(), потому что последнее добавит накладные расходы при первом обходе. Это мои сомнения !! - person Miguel Gamboa; 13.03.2018
comment
В самом деле, вы можете сохранить поток, если хотите прочитать поток один раз; но если вам нужно проанализировать коллекцию элементов несколько раз, я бы предпочел преобразовать поток элементов в массив элементов, а не использовать готовый к обратному давлению холодный поток для итерации по этим элементам. - person Stéphane Appercel; 15.03.2018
comment
Не уверен, что преобразование конвейерной линии async в конвейер синхронизации - лучшее предложение, которое я когда-либо видел. - person Boris the Spider; 27.06.2018