Как связать два Completable в RxJava2

У меня два Completable. Я хотел бы выполнить следующий сценарий: если первая Completable перейдет к onComplete, продолжите со второй Completable. Окончательные результаты будут на Завершение второй Завершенной таблицы.

Вот как я это делаю, когда у меня есть Single getUserIdAlreadySavedInDevice () и Completable login ():

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))

}

person Mladen Rakonjac    schedule 08.03.2017    source источник


Ответы (5)


Вы ищете оператора andThen.

Возвращает Completable, которая сначала запускает эту Completable, а затем другую завершаемую.

firstCompletable
    .andThen(secondCompletable)

В общем, этот оператор является «заменой» flatMap на Completable:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)
person Maksim Ostrovidov    schedule 09.03.2017
comment
Это должно быть отмечено как правильный ответ. Я столкнулся с той же ситуацией, что и оп, и искал ответы 2 дня. Ваш ответ @Maxim Ostrovidov - это все, что я ищу. Спасибо! - person chip; 18.06.2017
comment
Кроме того, я пробовал использовать doOnComplete (), НЕ делайте этого, он завершает поток - person Mladen Rakonjac; 20.07.2017
comment
В документации не указано, выполняется вторая завершаемая таблица или нет в случае ошибки в первой. Кто-нибудь знает? - person BoD; 12.10.2017
comment
@BoD Если в потоке возникает ошибка - он завершается событием onError. Это общее поведение реактивных потоков. Итак, отвечая на ваш вопрос, второй Completable выполняться не будет. - person Maksim Ostrovidov; 14.10.2017
comment
Хммм, я не думаю, что andThen является заменой flatMapCompletable, потому что andThen принимает экземпляр Completable, тогда как flatMapCompletable принимает функцию, которая создает Completable. Первый из них нетерпелив, когда вы вызываете его, создается Completable, тогда как во втором Completable создается лениво после завершения Completable. С нетерпеливой версией вы могли бы вызвать построение Completable до того, как первая будет завершена, не так ли? - person Galder Zamarreño; 08.02.2018
comment
andThen() является синонимом concatWith(), поэтому применяется то же поведение при ошибке (т. Е. Сбой всего потока при запуске onError()). - person E.M.; 15.05.2018
comment
Заманнено прав. При этом вам нужно обернуть все, что жаждет, Single.fromCallable (() - ›eagerCompletable) .flatMapCompletable (c -› c). Меня это только что укусило. - person Sparky; 15.08.2018
comment
На самом деле есть статическая функция Single / Completable / Observable defer (), которую вы можете использовать для создания ленивых функций, подобных плоской карте. - person Sparky; 15.08.2018

TL; DR: в других ответах отсутствует тонкость. Используйте doThingA().andThen(doThingB()), если вам нужен эквивалент concat, используйте doThingA().andThen(Completable.defer(() -> doThingB()), если вам нужен эквивалент flatMap.


РЕДАКТИРОВАТЬ: более полная ссылка

  • flatMap() - версия отображения merge()
  • concatMap() - версия отображения concat()
  • Для Completable вам нужно defer(), чтобы сделать вызовы функций ленивыми, как в функциях сопоставления для Single или Observable (или, желательно, чтобы ничего не происходило, пока вы не нажмете подписку - это хорошее соглашение, которому следует следовать, и оно используется в официальных библиотеках Rx как как и любые расширения Rx, с которыми я сталкивался, для продвинутых пользователей это относится только к холодным завершаемым таблицам, но большинство людей могут это игнорировать).
  • единственная разница между concat(a, b) и a.andThen(b) - синтаксис

Некоторые примеры:

  • foo(a).andThen(bar(b)) будет:

    1. call foo(a)
    2. немедленно вызвать bar(b), даже если завершенная таблица, возвращенная на шаге 1, возвращает ошибку
    3. подписаться на любой возвращаемый шаг 1
    4. После этого подпишется на результат bar(b) , только если последний шаг выполнен успешно
  • foo(a).andThen(Completable.defer(() -> bar(b)) будет:

    1. call foo(a)
    2. подписаться на результат шага 1
    3. только в том случае, если завершение, возвращаемое функцией foo(a), выполнено успешно, вызывается bar(b)

Я собираюсь опустить рассмотрение merge(), поскольку он становится немного сложнее, но короче говоря, это то, что нужно называть, если вы хотите «параллелизма».


Приведенные выше ответы в некотором роде верны, но я счел их вводящими в заблуждение, потому что они упускают тонкость, касающуюся нетерпеливой оценки.

doThingA().andThen(doThingB()) вызовет doThingB() немедленно, но подпишется на наблюдаемое, возвращаемое doThingB(), когда завершится наблюдаемое, возвращенное doThingA().

doThingA().andThen(Completable.defer(() -> doThingB()) вызовет doThingB() только после того, как A завершится.

Это важно, только если doThingB() имеет побочные эффекты перед событием подписки. Например. Single.just(sideEffect(1)).toCompletable()

Реализация, которая не имеет побочных эффектов перед событием подписки (истинное холодное наблюдение), может быть Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable().

В случае, если меня только что укусило, вещь A - это некоторая логика проверки, и doThingB() немедленно запускает асинхронное обновление базы данных, которое завершает VertX ObservableFuture. Это плохо. Возможно, doThingB() должен быть написан так, чтобы обновлять базу данных только при подписке, и я собираюсь попытаться спроектировать вещи таким образом в будущем.

person Sparky    schedule 15.08.2018
comment
или просто обернутый метод doThingB () с помощью Observable.create () или Observable.fromCallable (). - person Qing; 06.12.2018
comment
@Qing, если doThingB () возвращает наблюдаемую или завершаемую, это проблематично. Create - это API нижнего уровня, поэтому вам придется повторно реализовать defer (). Observable.fromCallable () вызовет серьезные ошибки: он запустит вызываемый объект, тогда результат (внутренняя завершаемая таблица) будет проигнорирован (если вы не добавите плоскую карту, но это то же самое, что и defer, но в двух вызовах), поэтому он выиграл Не ждите завершения операции, и все ошибки будут потеряны. (править: более конструктивный ответ). - person Sparky; 06.12.2018
comment
@Sparky хм, это так, у вас есть образец кода, чтобы попробовать, никогда не сталкивался с этим, я подумал, а затем подожду, пока внутреннее наблюдение не завершится - person Qing; 07.12.2018
comment
Оно делает; но Completable.complete (). andThen (Observable.fromCallable (() - ›Completable.fromAction (() -› System.out.println (boo))). subscribe () ничего не напечатает, потому что внутренняя завершаемая таблица никогда не подписался. - person Sparky; 07.12.2018
comment
Вот пример плохого doThingB (): Completable doThingB () {System.out.println (первая половина работы); return Completable.fromAction (() - ›System.out.println (вторая половина работы);}. Разработчик думает, что он печатает две строки при успехе или ни одной строки при неудаче, потому что именно так будет работать flatMap. Но Completable.complete ( ) .andThen (Observable.fromCallable (() - ›doThingB ())). subscribe () и Completable.error (new RuntimeException ()). andThen (doThingB ()). subscribe () распечатает первую половину работы. воспроизвести поведение плоской карты, вам нужно использовать Completable.defer (() - ›doThingB ()). - person Sparky; 07.12.2018
comment
Но я согласен с тем, что было бы лучше провести рефакторинг doThingB (), если вы это имеете в виду. - person Sparky; 07.12.2018
comment
Просто чтобы подтвердить, если я использую .andThen(Completable.defer(()->doThingB()), doThingB() не будет вызываться, если doThingA() не удалось и вызвать onError(), верно? - person Jack; 09.01.2019
comment
Совершенно уверен: `Completable.fromAction (() -› System.out.println (foo)) .andThen (Completable.error (new RuntimeException (бац!))) .AndThen (Completable.fromAction (() - ›System.out .println (Привет, мир))). await (); ` - person Sparky; 18.01.2019
comment
defer() является эквивалентом fromAction() второго порядка, так же как flatMap() является эквивалентом второго порядка map(), поэтому здесь они эквивалентны. - person Sparky; 18.01.2019
comment
Интересно, почему нет такого метода для Completable (например, flatMap() для Single и Observable), который бы явно передавал наше намерение выполнить doThingB() только после завершения doThingA(). Использование .defer() усложняет код, может быть забыто и приведет к незаметным ошибкам. - person Varvara Kalinina; 14.03.2019
comment
Я думаю, это в основном потому, что в чистом реактивном режиме все наблюдаемые и завершаемые в любом случае должны быть ленивыми, и если это правда, то нет причин нуждаться в такой функции. Под полностью ленивым я подразумеваю, что ничего не происходит до тех пор, пока не будет вызвана subscribe (), а если subscribe () вызывается снова, операция повторяется. Если вы посмотрите в другом месте в структуре, есть много других мест, которые не работают, если наблюдаемое не является полностью ленивым (например, 'retry ()'). Использование кавычек, потому что на практике кажется, что на самом деле никто не делает это, и не совсем очевидно, что мы должны это делать. - person Sparky; 14.03.2019
comment
Обнаружил это сегодня, когда doThingB() был вызван немедленно и вызвал ошибку. Закончил с использованием defer, и чтение этого ответа только успокоило меня. Очень хорошо объяснено. Спасибо! - person Saurabh Shrivastava; 04.01.2020
comment
Правильно ли это заявление Use doThingA().andThen(doThingB()) if you want the equivalent of concat. насколько я понимаю, concat выполняется последовательно, а flatmap - нет ..? - person Jongz Puangput; 31.01.2020
comment
@JongzPuangput удалил мой оригинал, потому что он был неправильным. Я обновил свой ответ. Вкратце: да, andThen() идентичен concat(), но с другим синтаксисом. Не существует реального эквивалента flatMap() или concatMap() для завершения, но defer() воспроизводит ленивое поведение, для которого многие люди используют flatMap(). - person Sparky; 31.01.2020
comment
@Sparky, это неправильно: немедленно вызовите bar (b), даже если завершенная таблица, возвращенная на шаге 1, возвращает ошибку Попробуйте это: Completable.error(new RuntimeException("Boom")).andThen((CompletableObserver o) -> System.out.println("Never happens")).subscribe(); - person igobivo; 06.07.2021
comment
@igobivo нет, если вы думаете о примере foo(a).andThen(bar(b)), который вы неправильно его прочитали, это стандартный приоритет функций, а не что-то реактивное. Вы думаете о том, что было бы, если бы это было foo(a).andThen((CompletableObserver o) -> bar(b)). Проблема возникает, когда люди делают ошибку новичка и определяют bar(b) как что-то вроде Completable bar(String b) { if(someDatabaseOperation(b)) { return Completable.complete(); } else { return Completable.error(); } }. Я видел это как минимум дважды в дикой природе. - person Sparky; 07.07.2021
comment
Привет, @Sparky, я попробовал Completable.defer {doSecondCompletable ()}, но doSecondCompletable не вызывается. Я что-то пропустил? - person etomun; 18.07.2021
comment
@etomun единственное, что я могу придумать, это спросить, проверили ли вы, что результат подписывается? `` Завершаемый c = Completable.complete (); System.out.println (точка A); Завершаемый e = Completable.defer (() - ›{System.out.println (точка B); return Completable.fromAction (() -› System.out.println (точка C));}); System.out.println (точка D); c.andThen (e) .subscribe (); System.out.println (точка E); `` печатает для меня A- ›D-› B- ›C-› E. - person Sparky; 19.07.2021

Пытаться

Completable.concat

Returns a Completable which completes only when all sources complete, one after another.

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)

person Deni Erdyneev    schedule 16.03.2017
comment
Concat приведет к той же проблеме, что и описанная выше. Это будет решено немедленно. Итак, если вы действительно зависите от своего вызова, прежде чем вы должны придерживаться doThingA (). AndThen (Completable.defer (() - ›doThingB ()). - person Highriser; 20.02.2019

У меня была такая же проблема, и мне пришлось использовать оператор .concactWith, чтобы он работал. В моем случае у меня есть два развлечения типа Completable.

fun makeTwoThings(): Completable {
    makeFirstThing().concatWith(makeSecondThing())
}

fun makeFirstThing(): Completable{
     //TODO()
}

fun makeSecondThing(): Completable{
     //TODO()
}
person Leonardo Paixão    schedule 14.11.2019

Обратите внимание, что ответ, за который здесь больше голосов, немного вводит в заблуждение. См. Пример ниже, моя идея состоит в том, чтобы показать некоторые сценарии тестирования и показать, как ведет себя завершаемая логика с оператором andThen.

 private fun doThingAFail(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
            throw(Exception("The excep"))
        }
    }

    private fun doThingB(): Completable {
        print("Do thingB Called\n")
        return Completable.fromCallable {
            print("calling stream B\n")

        }
    }

    private fun doThingA(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
        }
    }

Обратите внимание на это для теста ниже:

@Test
fun testCallAPlusB() {
    doThingA().andThen(doThingB())
}

Краткое примечание: обратите внимание, что мы не подписываемся на эти Completables в этом фрагменте.

вывод будет:

Do thingA Called
Do thingB Called

Для теста:

@Test
fun theTestSubscribe() {
    doThingA().andThen(doThingB()).subscribe()
}

Результатом будет:

Do thingA Called
Do thingB Called
calling stream A
calling stream B

И, наконец, в случае сбоя первой таблицы завершения вторая таблица завершения не будет выполняться.

@Test
fun theTestFailThingA() {
    doThingAFail().andThen(doThingB()).subscribe()
}

вывод будет:

Do thingA Called
Do thingB Called
calling stream A

Ключевой концепцией здесь является то, что логика внутри метода и внутри наблюдаемого выполняется не одновременно. Строки Do thingA Called и Do thingB Called будут напечатаны после того, как мы вызовем методы doThingA() и doThingB(). В то время как линии вызывающего потока A и вызывающего потока B будут вызываться только тогда, когда кто-то подписывается на методы doThingA и doThingB.

Вторая концепция заключается в том, как оператор andThen будет обрабатывать ошибки. В приведенном выше примере, если doThingA() completable завершается с ошибкой, поток завершится и не будет печатать строку B вызывающего потока.

person camposbrunocampos    schedule 30.03.2021