Как определить завершение всех содержащихся Observables в Observable‹Observable‹Object››

Я хочу, чтобы метод в моем API возвращал Observable‹Observable‹Object››, но я хочу, чтобы код в этом методе знал, как только все содержащиеся Observables будут завершены, чтобы он мог что-то закрыть. Как лучше всего это сделать?

Чтобы быть более точным, я хочу, чтобы этот метод был завершен:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, Action0 action) {
  ...
}

person Dave Moten    schedule 31.01.2014    source источник
comment
покажите нам больше кода, чтобы лучше понять вашу проблему   -  person hoaz    schedule 31.01.2014
comment
Это действительно зависит от того, как ваш метод API создает содержащиеся в нем наблюдаемые объекты. Можете ли вы опубликовать код из вашего метода, который создает Observable<Observable<Object>>? Тогда мы узнаем, откуда берутся эти внутренние наблюдаемые, и как методу лучше всего отслеживать их завершение.   -  person Brandon    schedule 31.01.2014


Ответы (3)


Извиняюсь, что мой ответ находится в .NET (как и тег system.reactive); Я уверен, что вы можете перевести это, хотя!

Если ваш IObservable<IObservable<Object>> задан source, то:

source.Merge()
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);

Примечание. Это сломается, если произойдет ошибка любого из потоков (что приведет к прекращению объединенного потока в этой точке), поэтому вы также можете сделать это, чтобы проглотить ошибки в отдельных потоках:

source.SelectMany(x => x.Catch(Observable.Empty<Object>()))
      .Subscribe(_  => {}, /* not interested in onNext */
                 () => /* onCompleted action here, called when all complete */);
person James World    schedule 31.01.2014
comment
Вы делаете предположение, что Observable‹Observable‹Object›› можно объединить без потери смысла. Это предположение, к сожалению, сделать нельзя. - person Dave Moten; 09.02.2014
comment
Рассматривая ваш ответ как подсказку, хотя я думаю, что метод слияния будет ключом к достижению цели. - person Dave Moten; 09.02.2014
comment
Я не делал такого предположения - вы только просили знать, когда все потоки завершились, вот что я вам дал. Похоже, у вас есть дополнительная информация, которую можно добавить к вашему вопросу? Было бы здорово добавить тест или тесты, которые должно пройти решение. Глядя на то, что вы добавили, зачем вам проходить через источник? Две отдельные подписки не подойдут? Это также позволит избежать побочных эффектов. - person James World; 09.02.2014
comment
Я понимаю, две подписки сделают это, но испортят инкапсуляцию, которая, я думаю, достигается в моем ответе ниже. Да, я согласен, что вы не сделали этого предположения, спасибо. Возможно, мне нужно было пояснить, что я хочу, чтобы действие происходило без побочных эффектов на источник (помимо самого действия). Ваш ответ привел меня к моему ответу, спасибо. - person Dave Moten; 09.02.2014

Я считаю, что эта реализация метода делает свое дело без побочных эффектов:

public static <T> Observable<Observable<T>> doWhenAllComplete(
        final Observable<Observable<T>> original, final Action0 action) {
    return Observable.create(new OnSubscribeFunc<Observable<T>>() {

        @Override
        public Subscription onSubscribe(Observer<? super Observable<T>> o) {
            ConnectableObservable<Observable<T>> published = original
                    .publish();
            Subscription sub1 = Observable.merge(published)
                    .doOnCompleted(action).subscribe();
            Subscription sub2 = published.subscribe(o);
            Subscription sub3 = published.connect();
            return Subscriptions.from(sub1, sub2, sub3);
        }
    });
}
person Dave Moten    schedule 09.02.2014

Для меня это работает:

bothSources = source1.Cast<Object>().Merge (source2.Cast<Object>());

В моем случае мне нужно было дождаться только 2 источников, но вы могли бы создать функцию, которая получает список источников и объединяет их все.

person Lay González    schedule 14.11.2014