RxJava: объединение горячих и холодных наблюдаемых для ожидания друг друга

У меня есть мои наблюдаемые, определенные следующим образом

    val initLoading = Observable.fromCallable { println("${System.currentTimeMillis()}") }
            .subscribeOn(Schedulers.computation())
            .delay(WAIT_TIME, TimeUnit.SECONDS)
            .map { "loading ${System.currentTimeMillis()}" }
            .observeOn(AndroidSchedulers.mainThread())

    val click = RxView.clicks(button).map { "click ${System.currentTimeMillis()}" }
    initLoading.concatWith(click)
            .subscribeBy(
                    onNext = { println("result $it") },
                    onError = { throw it }
            )

initialLoading начинает работать с методом Activity onCreate. click выполняется по нажатию кнопки. У меня есть два случая, первый работает, второй нет.

случай 1

действие начинается, и кнопка нажимается через WAIT_TIME секунд. Выход:

   01-23 13:08:07.170  I/System.out: 1516698487170
   01-23 13:08:17.174  I/System.out: result loading 1516698497172
   01-23 13:08:29.258  I/System.out: result click 1516698509258

случай 2

действие начинается, и кнопка нажимается до WAIT_TIME периода. Выход

   01-23 13:09:07.392 I/System.out: 1516698547392
   01-23 13:09:17.398 I/System.out: result loading 1516698557395

Итак, проблема в том, что событие клика теряется. Я хочу, чтобы событие click дождалось загрузки, а затем продолжило работу. Короче говоря, вывод случая 2 должен быть таким же, как и случай 1.

Как я могу получить это с помощью операторов rx. Я пробовал merge, но он просто сочетает в себе оба, и событие щелчка не ждет загрузки.

Я также попробовал reply, cache, publish, share, но не смог заставить их правильную комбинацию работать так, как я хочу.


person Rezo Shalikashvili    schedule 23.01.2018    source источник


Ответы (2)


Оператор concatWith хорош для вашего варианта использования, но второй наблюдаемый объект должен начать хранить события щелчка сразу после его создания, чтобы сохраненные события могли быть отправлены, когда на наблюдаемый объект подписан (что происходит, когда initLoading завершается). Этого можно добиться, изменив наблюдаемый объект click с помощью replay() и connect().

val replayedClicks = click.replay();
replayedClicks.connect(); // The original click observable is subscribed to at this point

Теперь вы можете использовать replayedClicks в concatWith, и его сохраненные события будут воспроизведены после завершения initLoading:

initLoading.concatWith(replayedClicks)
        .subscribeBy(
                onNext = { println("result $it") },
                onError = { throw it }
        )
person Janos Breuer    schedule 23.01.2018
comment
действительно хорошее сочетание и решение. Спасибо - person Rezo Shalikashvili; 25.01.2018

Попробуйте combLatest вместо concatWith. "загрузка результата" всегда будет одинаковой

person lucas_sales    schedule 23.01.2018