Я относительно новичок в RxJava2, и у меня странное поведение, поэтому, вероятно, я использую этот инструмент не так, как надо.
Это довольно большой проект, но я выделил приведенный ниже фрагмент как минимально воспроизводимый код:
Observable
.interval(333, TimeUnit.MILLISECONDS)
.flatMap(new Function<Long, ObservableSource<Integer>>() {
private Subject<Integer> s = PublishSubject.create();
private int val = 0;
@Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
val++;
s.onNext(val);
return s;
}
})
.subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.w("value: %s", integer);
}
});
Этот код имитирует события из моего rx-потока, используя .interval
и flatMap
, получает эти события, «выполняет некоторую обработку» и использует Subject
для передачи результатов вниз по потоку.
Поток — это непрерывный процесс, который будет иметь несколько событий.
Этот минимальный код глуп, потому что я отправляю только обратный вызов apply
, но в реальном случае есть несколько возможных моментов, когда может произойти push, и количество событий, полученных во время apply
, не совпадает с количеством, которое будет отправлено. через Тему.
Что я ожидал увидеть с этим кодом:
value: 2 // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc
что я на самом деле получил:
value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
... etc
Я также пытался получить Observable<Integer> o = s.share();
и вернуть его или вернуть напрямую s.share();
с теми же результатами.
Я как бы понимаю, почему это происходит. ObservableSource
снова подписывается n снова n снова, так что в каждом цикле больше событий.
Вопрос:
Как я могу добиться ожидаемого поведения?
(если мое ожидаемое поведение было неясным, пожалуйста, спросите больше в комментариях)
private Subject<Integer> s = PublishSubject.create();
в другой прицел, если бы был в тебе - person Blackbelt   schedule 16.02.2017Function
имеет свой собственный класс. - person Budius   schedule 16.02.2017