RxJava onBackpressureBuffer не испускает элементы

Я был свидетелем странного поведения onBackpressureBuffer, я не уверен, является ли это допустимым поведением или ошибкой.

У меня есть вызов tcp, который отправляет элементы с определенной скоростью (используя потоковую передачу и inputStream, но это только для некоторой информации)

Вдобавок к этому я создал наблюдаемый объект, используя create, который будет создавать элемент каждый раз, когда он будет готов.

Назовем это сообщениями().

Тогда я делаю это:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});

Используя инструменты аналитики, я заметил, что MissingBackPressureException возникает редко, поэтому я добавил onBackpressureBuffer в вызов.

Если я добавлю его после observeOn:

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})

все работает нормально, но это означает, что он будет буферизоваться только после того, как попадет в основной поток пользовательского интерфейса, поэтому я предпочел, чтобы это было так:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});

И тут начинаются странности.

Я заметил, что пока messages() продолжает отправлять элементы, в какой-то момент они перестают доставляться подписчику.

Точнее, после ровно 16 элементов, по-видимому, происходит то, что буфер начнет удерживать элементы, не передавая их вперед.

Как только я отменю messages() с помощью какого-то механизма тайм-аута, это заставит messages() выдать onError(), и буфер немедленно выдаст все элементы, которые он сохранил (они будут обработаны).

Я проверил, не виноват ли подписчик в том, что он слишком много работает, но это не так, он закончил, но все равно не получает предметы...

Я также пытался использовать метод request(n) в подписчике, запрашивающем один элемент после завершения onNext(), но буфер не работает.

Я подозреваю, что это вызвано системой обмена сообщениями основного потока пользовательского интерфейса Android с обратным давлением, но я не могу объяснить, почему.

может кто-нибудь объяснить, почему это происходит? это ошибка или допустимое поведение? Спасибо!


person ndori    schedule 20.10.2016    source источник


Ответы (2)


Не зная, как messages(), исходя из описанного поведения, это такой же тупик с тем же пулом, что и с этот вопрос

Обходной путь, который вы не пробовали, состоит в том, чтобы поместить .onBackpressureBuffer между subscribeOn и observeOn.

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
person akarnokd    schedule 20.10.2016
comment
интересно, посмотрю - person ndori; 20.10.2016
comment
messages() использует inputStream из responseBody.byteStream() из okHttp3, затем он использует цикл while для inputStream с .read(). каждый раз, когда это действительное сообщение, оно будет выдавать его и продолжать чтение. Я использую разделитель, чтобы различать начало и конец сообщения. если я получу IOException от read(), я вызову onError. - person ndori; 20.10.2016
comment
это сработало отлично, я не знал, что есть разница между вызовом после и не перед subsribeOn(), я обычно использую шаблон помещения subsribeOn(io()).observeOn(ui()) в последнюю очередь в цепочка непосредственно перед подпиской, это неправильно? Я буду любить указывать направление, чтобы понять эту область более тщательно. - person ndori; 20.10.2016

Вопрос отличается, но ответ сводится к одному и тому же. Реализация конструктора observeOn: OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;

}

Последняя строка указывает на размер буфера.

Буфер размер на Android: 16.

Решение просто передает больший размер буфера в observeOn(Scheduler scheduler, int bufferSize):

messages()
    .observeOn(AndroidSchedulers.mainThread(), {buffer_size})

Будьте осторожны, чтобы не указать слишком большое значение, так как Android имеет ограниченный объем памяти.

person R. Zagórski    schedule 20.10.2016
comment
tnx, хотя это и объясняет волшебство 16, это только симптом, а не фактическая проблема. @akarnokd решает это именно так, как задумано. - person ndori; 20.10.2016