Spring Web-Flux: как вернуть Flux веб-клиенту по запросу?

Мы работаем с Spring boot 2.0.0.BUILD_SNAPSHOT и spring boot webflux 5.0.0, и в настоящее время мы не можем передавать поток клиенту по запросу.

В настоящее время я создаю поток из итератора:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }
    });
}

А по запросу я просто делаю:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
    return this.provider.getAllFlux();
}

Когда я теперь локально звоню localhost:8080/all, через 10 секунд я получаю 503 код состояния. Также как и у клиента, когда я запрашиваю /all с помощью WebClient:

public Flux<ItemIgnite> getAllPoducts(){
    WebClient webClient = WebClient.create("http://localhost:8080");

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
    f.subscribe(System.out::println);
    return f;

}

Ничего не произошло. Данные не передаются.

Когда я вместо этого сделаю следующее:

public Flux<List<ItemIgnite>> getAllFluxMono() {
    return Flux.just(this.getAllList());
}

а также

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
    return this.provider.getAllFluxMono();
}

Это работает. Я предполагаю, это потому, что все данные уже закончили загрузку и просто переданы клиенту, поскольку он обычно передает данные без использования потока.

Что мне нужно изменить, чтобы поток передавал данные веб-клиенту, который запрашивает эти данные?

ИЗМЕНИТЬ

У меня есть данные в кэше зажигания. Итак, мой getAllIterator загружает данные из кеша зажигания:

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
    return this.igniteCache.iterator();
}

ИЗМЕНИТЬ

добавив flux.complete(), как предложил @Simon Baslé:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete(); // see here
    });
}

Решает 503 проблему в браузере. Но проблему с WebClient это не решает. Данные по-прежнему не переданы.

ИЗМЕНИТЬ 3

используя publishOn с Schedulers.parallel():

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.<ItemIgnite>create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete();
    }).publishOn(Schedulers.parallel());
}

Не меняет результат.

Здесь я публикую то, что получает WebClient:

value :[Item ID: null, Product Name: null, Product Group: null]
complete

Таким образом, похоже, что он получает один элемент (из более чем 35000), и значения равны нулю, и он заканчивает после.


person Mulgard    schedule 10.07.2017    source источник
comment
Не могли бы вы объяснить, что делает getAllIterator? Это блокировка? Чтение данных из базы данных? Из памяти?   -  person Brian Clozel    schedule 10.07.2017


Ответы (2)


Выскакивает одна вещь: вы никогда не вызываете flux.complete() в своем create.

Но на самом деле есть фабричный оператор, который предназначен для преобразования Iterable в Flux, так что вы можете просто сделать Flux.fromIterable(this)

Изменить: если ваш Iterator скрывает сложность, такую ​​как запрос БД (или любой блокирующий ввод-вывод), имейте в виду, что это вызывает проблему: все, что блокируется в реактивной цепочке, если не изолировано в выделенном контексте выполнения использование publishOn может блокировать не только всю цепочку, но и другие реактивные процессы (поскольку потоки могут и будут использоваться несколькими реактивными процессами).

Ни create, ни fromIterable не делают ничего, в частности, для защиты от блокировки источников. Я думаю, что вы столкнулись с такой проблемой, судя по тому, как вы зависаете с WebClient.

person Simon Baslé    schedule 10.07.2017
comment
добавление flux.complete() позволяет получить ожидаемый результат, когда я тестирую его в своем браузере. Я больше не получаю 503. Это не решает проблему, когда я пробую с WebClient, все равно нет результатов. - person Mulgard; 10.07.2017
comment
Я вижу еще две проблемы; ваш getAllIterator блокируется, поэтому вам нужно запланировать это в конкретном Планировщике (см. документацию по реактору на publishOn). На стороне WebClient вы вызываете subscribe на Flux, но также возвращаете его - не следует делать и то, и другое. - person Brian Clozel; 10.07.2017
comment
Я отредактировал свой пост, чтобы показать вам, как я добавил publishOn, и предоставить результат на моем WebClient p. с. подписка на поток - это просто проверка поступления выходных данных. - person Mulgard; 10.07.2017

Проблема заключалась в моем Объекте ItemIgnite, который я передаю. Системный Flux, похоже, не справляется с этим. Потому что, если я изменю свой исходный код на следующий:

public Flux<String> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue().toString());
        }
    });
}

Все работает нормально. Без publishOn и без flux.complete(). Может у кого-то есть идеи, почему это работает.

person Mulgard    schedule 10.07.2017