RxAndroid — правильное использование Rx EventBus

У меня точно такое поведение

Подписчик OnComplete вызывается дважды

(что ожидается в соответствии с http://(который%20is%20is%20предполагаемый%20as%20per%20http://reactivex.io/documentation/subject.html))

Но в моем сценарии: это происходит примерно так:

У меня есть AudioRecordingService, который отображает уведомление, в котором у меня есть возможность для пользователя сохранить или удалить текущую запись, которая отлично работает. Но я пытаюсь начать использовать RxAndroid, срабатывает кнопка сохранения моего уведомления.

RxEventBus.getInstance().postEvent(new RxEvents(RxEventsEnum.AUDIO_STOP_AND_SAVE));

который вызывает

bindUntilActivitySpecificEvent(RxEventBus.getInstance().forEventType(RxEvents.class),ActivityEvent.DESTROY).subscribeOn(
        AndroidSchedulers.mainThread()).subscribe(new Action1<RxEvents>() {
      @Override public void call(RxEvents rxEvents) {
        onEvent(rxEvents);
      }
    });

и в моем onEvent(rxEvent) на основе данных объекта rxEvents я соответствующим образом сохраняю и сохраняю запись. В первый раз, когда я пробую это, это работает нормально, но в последующие разы

@Override public void call(RxEvents rxEvents) {
            onEvent(rxEvents);
          }

вызывается несколько раз, например, когда я публикую событие во второй раз, этот обратный вызов вызывается дважды, в третий раз — трижды и т. д. (что на самом деле и делает PublishSubject). Я не хочу такого поведения, я хочу, чтобы Rx мог публиковать события и получать только последнее опубликованное событие и ничего больше.

Вот мой другой соответствующий код

protected final <T> Observable<T> bindUntilActivitySpecificEvent(Observable<T> observable,
      ActivityEvent event) {
    return observable.compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(lifecycle(), event))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }

и мой обычный класс RxEventBus:

public class RxEventBus {

  private static final RxEventBus INSTANCE = new RxEventBus();

  public static RxEventBus getInstance() {
    return INSTANCE;
  }

  private RxEventBus() {
  }

  private final Subject<Object, Object> mBus = new SerializedSubject<>(PublishSubject.create());


  public void postEvent(Object event) {
    mBus.onNext(event);
  }

  public <T> Observable<T> forEventType(Class<T> eventType) {
    return mBus.ofType(eventType).observeOn(AndroidSchedulers.mainThread());
  }
}

Каков наилучший подход с использованием RxAndroid? Обратите внимание, что я ищу только решение RxAndroid.


person uLYsseus    schedule 22.05.2016    source источник
comment
Похоже, вы где-то добавили несколько прослушивателей обратного вызова. Если он вызывается несколько раз.   -  person Torsten Ojaperv    schedule 22.05.2016
comment
Нет, я не знаю, я проверил все ссылки ????   -  person uLYsseus    schedule 23.05.2016


Ответы (1)


Вы создаете новую наблюдаемую каждый раз, когда запускаете событие в

RxEventBus.getInstance().forEventType(RxEvents.class)

Вам нужно кэшировать наблюдаемые, которые вы создаете для каждого типа события.

person JohnWowUs    schedule 23.05.2016
comment
Я думаю, вы правы, я прямо сейчас подписываюсь на onResume(), и мне нужно сделать это в onCreate(), я попробую - person uLYsseus; 23.05.2016
comment
это сработало, моя вина, и, как говорят мудрые люди, человек ошибается, ха-ха, спасибо - person uLYsseus; 23.05.2016