Фон
Я пытаюсь реализовать что-то похожее на простой неблокирующий ограничитель скорости с помощью Spring Project Reactor версии 3.3.0. . Например, чтобы ограничить число до 100 запросов в секунду, я использую эту реализацию:
myFlux
.bufferTimeout(100, Duration.ofSeconds(1))
.delayElements(Duration.ofSeconds(1))
..
Это отлично работает для моего варианта использования, но если подписчик не поспевает за скоростью издателя myFlux
, он (правильно) выдаст OverflowException
:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxLift] :
reactor.core.publisher.Flux.bufferTimeout(Flux.java:2780)
В моем случае важно, чтобы все элементы потреблялись подписчиком, например. падение противодавления (onBackpressureDrop()
) недопустимо.
Вопрос
Есть ли способ вместо того, чтобы сбрасывать элементы при обратном нажатии, просто приостановить публикацию сообщений, пока подписчик не догонит? В моем случае myFlux
публикует конечный, но большой набор элементов, сохраняемых в надежной базе данных, поэтому удаление элементов не требуется imho.