Я был свидетелем странного поведения onBackpressureBuffer, я не уверен, является ли это допустимым поведением или ошибкой.
У меня есть вызов tcp, который отправляет элементы с определенной скоростью (используя потоковую передачу и inputStream, но это только для некоторой информации)
Вдобавок к этому я создал наблюдаемый объект, используя create, который будет создавать элемент каждый раз, когда он будет готов.
Назовем это сообщениями().
Тогда я делаю это:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
Используя инструменты аналитики, я заметил, что MissingBackPressureException возникает редко, поэтому я добавил onBackpressureBuffer в вызов.
Если я добавлю его после observeOn
:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer()
.subscribe({//do some work})
все работает нормально, но это означает, что он будет буферизоваться только после того, как попадет в основной поток пользовательского интерфейса, поэтому я предпочел, чтобы это было так:
messages()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
И тут начинаются странности.
Я заметил, что пока messages()
продолжает отправлять элементы, в какой-то момент они перестают доставляться подписчику.
Точнее, после ровно 16 элементов, по-видимому, происходит то, что буфер начнет удерживать элементы, не передавая их вперед.
Как только я отменю messages()
с помощью какого-то механизма тайм-аута, это заставит messages()
выдать onError()
, и буфер немедленно выдаст все элементы, которые он сохранил (они будут обработаны).
Я проверил, не виноват ли подписчик в том, что он слишком много работает, но это не так, он закончил, но все равно не получает предметы...
Я также пытался использовать метод request(n)
в подписчике, запрашивающем один элемент после завершения onNext()
, но буфер не работает.
Я подозреваю, что это вызвано системой обмена сообщениями основного потока пользовательского интерфейса Android с обратным давлением, но я не могу объяснить, почему.
может кто-нибудь объяснить, почему это происходит? это ошибка или допустимое поведение? Спасибо!