У меня точно такое поведение
Подписчик OnComplete вызывается дважды
(что ожидается в соответствии с http://(который%20is%20is%20предполагаемый%20as%20per%20http://reactivex.io/documentation/subject.html))
Но в моем сценарии: это происходит примерно так:
У меня есть AudioRecordingService, который отображает уведомление, в котором у меня есть возможность для пользователя сохранить или удалить текущую запись, которая отлично работает. Но я пытаюсь начать использовать RxAndroid, срабатывает кнопка сохранения моего уведомления.
RxEventBus.getInstance().postEvent(new RxEvents(RxEventsEnum.AUDIO_STOP_AND_SAVE));
который вызывает
bindUntilActivitySpecificEvent(RxEventBus.getInstance().forEventType(RxEvents.class),ActivityEvent.DESTROY).subscribeOn(
AndroidSchedulers.mainThread()).subscribe(new Action1<RxEvents>() {
@Override public void call(RxEvents rxEvents) {
onEvent(rxEvents);
}
});
и в моем onEvent(rxEvent) на основе данных объекта rxEvents я соответствующим образом сохраняю и сохраняю запись. В первый раз, когда я пробую это, это работает нормально, но в последующие разы
@Override public void call(RxEvents rxEvents) {
onEvent(rxEvents);
}
вызывается несколько раз, например, когда я публикую событие во второй раз, этот обратный вызов вызывается дважды, в третий раз — трижды и т. д. (что на самом деле и делает PublishSubject). Я не хочу такого поведения, я хочу, чтобы Rx мог публиковать события и получать только последнее опубликованное событие и ничего больше.
Вот мой другой соответствующий код
protected final <T> Observable<T> bindUntilActivitySpecificEvent(Observable<T> observable,
ActivityEvent event) {
return observable.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(lifecycle(), event))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
и мой обычный класс RxEventBus:
public class RxEventBus {
private static final RxEventBus INSTANCE = new RxEventBus();
public static RxEventBus getInstance() {
return INSTANCE;
}
private RxEventBus() {
}
private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());
public void postEvent(Object event) {
mBus.onNext(event);
}
public <T> Observable<T> forEventType(Class<T> eventType) {
return mBus.ofType(eventType).observeOn(AndroidSchedulers.mainThread());
}
}
Каков наилучший подход с использованием RxAndroid? Обратите внимание, что я ищу только решение RxAndroid.