Мы работаем с Spring boot 2.0.0.BUILD_SNAPSHOT и spring boot webflux 5.0.0, и в настоящее время мы не можем передавать поток клиенту по запросу.
В настоящее время я создаю поток из итератора:
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
});
}
А по запросу я просто делаю:
@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
return this.provider.getAllFlux();
}
Когда я теперь локально звоню localhost:8080/all
, через 10 секунд я получаю 503
код состояния. Также как и у клиента, когда я запрашиваю /all
с помощью WebClient
:
public Flux<ItemIgnite> getAllPoducts(){
WebClient webClient = WebClient.create("http://localhost:8080");
Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
f.subscribe(System.out::println);
return f;
}
Ничего не произошло. Данные не передаются.
Когда я вместо этого сделаю следующее:
public Flux<List<ItemIgnite>> getAllFluxMono() {
return Flux.just(this.getAllList());
}
а также
@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
return this.provider.getAllFluxMono();
}
Это работает. Я предполагаю, это потому, что все данные уже закончили загрузку и просто переданы клиенту, поскольку он обычно передает данные без использования потока.
Что мне нужно изменить, чтобы поток передавал данные веб-клиенту, который запрашивает эти данные?
ИЗМЕНИТЬ
У меня есть данные в кэше зажигания. Итак, мой getAllIterator
загружает данные из кеша зажигания:
public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
return this.igniteCache.iterator();
}
ИЗМЕНИТЬ
добавив flux.complete()
, как предложил @Simon Baslé:
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
flux.complete(); // see here
});
}
Решает 503
проблему в браузере. Но проблему с WebClient
это не решает. Данные по-прежнему не переданы.
ИЗМЕНИТЬ 3
используя publishOn
с Schedulers.parallel()
:
public Flux<ItemIgnite> getAllFlux() {
Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();
return Flux.<ItemIgnite>create(flux -> {
while(iterator.hasNext()) {
flux.next(iterator.next().getValue());
}
flux.complete();
}).publishOn(Schedulers.parallel());
}
Не меняет результат.
Здесь я публикую то, что получает WebClient:
value :[Item ID: null, Product Name: null, Product Group: null]
complete
Таким образом, похоже, что он получает один элемент (из более чем 35000), и значения равны нулю, и он заканчивает после.
getAllIterator
? Это блокировка? Чтение данных из базы данных? Из памяти? - person Brian Clozel   schedule 10.07.2017