Повторить попытку после задержки противодавления с помощью Spring Project Reactor?

Фон

Я пытаюсь реализовать что-то похожее на простой неблокирующий ограничитель скорости с помощью Spring Project Reactor версии 3.3.0. . Например, чтобы ограничить число до 100 запросов в секунду, я использую эту реализацию:

myFlux
      .bufferTimeout(100, Duration.ofSeconds(1))
      .delayElements(Duration.ofSeconds(1))
      ..

Это отлично работает для моего варианта использования, но если подписчик не поспевает за скоростью издателя myFlux, он (правильно) выдаст OverflowException:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxLift] :
    reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)

В моем случае важно, чтобы все элементы потреблялись подписчиком, например. падение противодавления (onBackpressureDrop()) недопустимо.

Вопрос

Есть ли способ вместо того, чтобы сбрасывать элементы при обратном нажатии, просто приостановить публикацию сообщений, пока подписчик не догонит? В моем случае myFlux публикует конечный, но большой набор элементов, сохраняемых в надежной базе данных, поэтому удаление элементов не требуется imho.


person Johan    schedule 18.11.2019    source источник


Ответы (1)


bufferTimeout(int maxSize, Duration maxTime) запрашивает неограниченное количество сообщений, поэтому нечувствителен к обратному давлению. Это делает его непригодным для вашего случая.

На концептуальном уровне bufferTimeout не может быть чувствительным к обратному давлению, потому что вы четко указываете издателю выпускать один пакет (даже если он пустой) за каждый истекший период времени. Если подписчик работает слишком медленно, это по праву вызовет переполнение.

Вместо этого попробуйте:

myFlux
   .delayElements(Duration.ofMillis(10))
   .buffer(100)

or

myFlux
   .buffer(100)
   .delayElements(Duration.ofSeconds(1))

buffer(int maxSize) запрашивает правильная сумма вверх по течению (request * maxSize), поэтому она чувствительна к обратному давлению со стороны подписчиков.

person Markus Appel    schedule 19.11.2019