RxJava2 flatMap создает повторяющиеся события

Я относительно новичок в RxJava2, и у меня странное поведение, поэтому, вероятно, я использую этот инструмент не так, как надо.

Это довольно большой проект, но я выделил приведенный ниже фрагмент как минимально воспроизводимый код:

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private Subject<Integer> s = PublishSubject.create();
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      s.onNext(val);
      return s;
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });

Этот код имитирует события из моего rx-потока, используя .interval и flatMap, получает эти события, «выполняет некоторую обработку» и использует Subject для передачи результатов вниз по потоку.

Поток — это непрерывный процесс, который будет иметь несколько событий.

Этот минимальный код глуп, потому что я отправляю только обратный вызов apply, но в реальном случае есть несколько возможных моментов, когда может произойти push, и количество событий, полученных во время apply, не совпадает с количеством, которое будет отправлено. через Тему.

Что я ожидал увидеть с этим кодом:

value: 2  // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc

что я на самом деле получил:

value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
 ... etc

Я также пытался получить Observable<Integer> o = s.share(); и вернуть его или вернуть напрямую s.share(); с теми же результатами.

Я как бы понимаю, почему это происходит. ObservableSource снова подписывается n снова n снова, так что в каждом цикле больше событий.

Вопрос:

Как я могу добиться ожидаемого поведения?

(если мое ожидаемое поведение было неясным, пожалуйста, спросите больше в комментариях)


person Budius    schedule 16.02.2017    source источник
comment
Я бы попробовал переместить private Subject<Integer> s = PublishSubject.create(); в другой прицел, если бы был в тебе   -  person Blackbelt    schedule 16.02.2017
comment
@Черный пояс, как я уже сказал. Это минимальный воспроизводимый код. В полном коде Function имеет свой собственный класс.   -  person Budius    schedule 16.02.2017
comment
Я могу судить о том, что вижу, и не могу угадать, что вы написали.   -  person Blackbelt    schedule 16.02.2017


Ответы (3)


Ваш PublishSubject подписан на несколько раз, по одному разу на элемент из interval().

Изменить: вам нужно будет каждый раз вводить новый PublishSubject (переключитесь на BehaviorSubject, если хотите сохранить первое/последнее излучение); передайте это длительному процессу и убедитесь, что его onComplete вызывается правильно, когда длительный процесс завершается.

person Tassos Bassoukos    schedule 16.02.2017
comment
Это я уже сказал. Мой вопрос в том, как добиться нужного мне поведения? - person Budius; 16.02.2017
comment
Отредактированный ответ с рекомендациями. - person Tassos Bassoukos; 16.02.2017
comment
Кроме того, я думаю, что использование PublishSubject немного громоздко. рассмотрите возможность более тесной интеграции вашего долгосрочного процесса с Rx. - person Tassos Bassoukos; 16.02.2017
comment
Спасибо за Ваш ответ. Но это не сработает. На самом деле долгий процесс выполняется до тех пор, пока приложение открыто, а onComplete просто игнорируется. Я хотел заменить данный Observable другим, который будет подписан на исходный, но уведомлять о несвязанных событиях. Это не соответствие 1-1. Эти события приходят, я создаю события входа/обновления/выхода (тайм-аут). И они могут поступать из разных источников (разные идентификаторы). Мне удалось сделать то, что я хотел, но это ОЧЕНЬ ОЧЕНЬ хакерский и громоздкий, используя оператор to(...). - person Budius; 16.02.2017
comment
Рассматривали ли вы возможность использования оператора switchMap? Или просто иметь синглтон Subject, доступный во всем приложении? - person Tassos Bassoukos; 16.02.2017
comment
Спасибо. switchMap указал мне правильное направление. Проверьте мой ответ для полного решения, которое я использовал. - person Budius; 17.02.2017

Изменить

После недавних комментариев я мог бы придумать такое решение:

class MyBluetoothClient {
  private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create()

  public Observable<BTLEEvent> getEventObservable() {
    return statusPublishSubject
  }

  private void publishEvent(BTLEEvent event) {
    statusPublishSubject.onNext(event)
  }

  public void doStuff1() {
    // do something that returns:
    publishEvent(BTLEEvent.someEvent1)
  }

  public void doStuff2() {
    // do something else that eventually yields
    publishEvent(BTLEEvent.someEvent2)
  }
}

И вы используете его следующим образом:

MyBluetoothClient client = MyBluetoothClient()
client
  .getEventObservable()
  .subscribe( /* */ )

///

client.doStuff1()

/// 

client.doStuff2

Исходный ответ

Будет ли это делать?

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      return Observable.just(val);
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });
person ULazdins    schedule 16.02.2017
comment
Спасибо за Ваш ответ. Для этого очень маленького примера кода я написал так. Но если вы прочитаете вопрос полностью, субъект может запускать новые события в любое время на основе других условий. Первым делом утром, вернувшись в офис, я попробую вместо этого использовать предложение Тассоса об использовании switchMap. - person Budius; 16.02.2017
comment
Я понимаю. Мне трудно придумать лучший ответ, учитывая так мало, как вы. Просто обратите внимание, что вы можете вернуть любой вид Observable, например Observable.range(0, 5).map(...).take(4)... и так далее. Дайте мне что-нибудь еще, и я постараюсь придумать лучший ответ;) - person ULazdins; 16.02.2017
comment
Более конкретные варианты использования, которые у меня есть (из нескольких других): прослушивание сканирования bluetooth LE, анализ рекламного фрейма (карты), фильтрация ожидаемых кадров и (по этому вопросу) разделение показаний на событие ввода, обновление события rssi и использование тайм-аут для генерации события выхода. Таким образом, класс хранит метку времени каждого события по мере их поступления на карту и каждую секунду выполняет проверку устройств выхода по тайм-ауту. - person Budius; 17.02.2017
comment
Это означает, что этот Observable будет генерировать 3 четных типа (enum enter, update, exit) из любого количества возможных устройств BLE из любых возможных событий сканирования. Завтра первым делом в офисе попробую .switchMap вместо этого. - person Budius; 17.02.2017
comment
Я обновил ответ, надеюсь, что это ближе к истине - person ULazdins; 17.02.2017
comment
Я ценю усилия, но я не понимаю, как я мог бы использовать это в Observable Fluent API. - person Budius; 17.02.2017

Итак, вот ответ, который я придумал. Я отмечу ответ @Tassos как правильный, поскольку он указал мне правильный путь.

Во-первых, мне нужен CachedSubject (субъект, который кэширует элементы, пока нет наблюдателей, и отправляет их, как только наблюдатель подключается), это необходимо, чтобы убедиться, что выбросы из apply действительно проходят. Класс в основном обертывает PublishSubject.

class CachedSubject<T> extends Subject<T> {

        private PublishSubject<T> publishSubject = PublishSubject.create();
        private Queue<T> cache = new ConcurrentLinkedQueue<>();

        @Override public boolean hasObservers() {
            return publishSubject.hasObservers();
        }

        @Override public boolean hasThrowable() {
            return publishSubject.hasThrowable();
        }

        @Override public boolean hasComplete() {
            return publishSubject.hasComplete();
        }

        @Override public Throwable getThrowable() {
            return publishSubject.getThrowable();
        }

        @Override protected void subscribeActual(Observer<? super T> observer) {
            while (cache.size() > 0) {
                observer.onNext(cache.remove());
            }
            publishSubject.subscribeActual(observer);
        }

        @Override public void onSubscribe(Disposable d) {
            publishSubject.onSubscribe(d);
        }

        @Override public void onNext(T t) {
            if (hasObservers()) {
                publishSubject.onNext(t);
            } else {
                cache.add(t);
            }
        }

        @Override public void onError(Throwable e) {
            publishSubject.onError(e);
        }

        @Override public void onComplete() {
            publishSubject.onComplete();
        }
    }

затем я использую этот класс с switchMap:

Observable
   .interval(1000, TimeUnit.MILLISECONDS)
   .switchMap(new Function<Long, ObservableSource<Integer>>() {

      private Subject<Integer> s = new CachedSubject<>();
      private int val = 0;

      @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
         val++;
         s.onNext(val);
         return s;
      }
   })
   .subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
         Log.w("value: %s", integer);
      }
   });

Это фактически позволяет мне получать любое количество событий по методу apply<T t> и иметь только 1 Consumer, подписанный на него, получая от него все события.

person Budius    schedule 17.02.2017