RxJava2 Как поделиться наблюдаемым с двумя другими наблюдателями и объединить его обратно с подписчиком

У меня есть следующие методы

Document createDocument(String url);
List<MediaContent> getVideo(Document doc);
List<MediaContent> getImages(Document doc);

List‹ MediaContent> будет потребляться

void appendToRv(List<MediaContent> media);

Мне нравится использовать RxJava2, чтобы

CreateDocument -> getVideo   ->
                             -> appendToRv  
               -> getImages  ->

(также видеовыход следует заказывать перед изображениями).

Как мне это сделать? Я попробовал flatMap, но, похоже, он позволяет использовать только один метод.

Single<List<MediaContent>> single = 
    Single.fromCallable(() -> createDocument(url))

      // . ?? .. 
      // this is the part i am lost with
      // how do i feed document to -> getVideo() and getImage()
      // and then merge them back into the subscriber 
      // 
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());

single.subscribe(parseImageSubscription);

DisposableSingleObserver

parseImageSubscription = new DisposableSingleObserver<List<MediaContent>>() {
    @Override
    public void onSuccess(List<MediaContent> media) { 
        if(media!=null) {
            appendToRv(media);
        }
    }

    @Override
    public void onError(Throwable error) { 
        doSnackBar("error loading: '" + q + "'");
    }
};

отдельные наблюдаемые для getVideos и getImages

Single<List<MediaContent>> SingleGetImage(Document document ) {
    return Single.create(e -> {
        List<MediaContent> result = getImage(document);
        if (result != null) {
            e.onSuccess(result);
        }else {
            e.onError(new Exception("No images found"));
        }
    });
}

Single<List<MediaContent>> singleGetVideo(Document document ) {
    return Single.create(e -> {
        List<MediaContent> result = getVideo( document);
        if (result != null) {
            e.onSuccess(result);
        }else {
            e.onError(new Exception("No videos found"));
        }
    });
}

person Angel Koh    schedule 02.03.2017    source источник


Ответы (2)


предполагая, что вы хотите выполнять запросы getVideos и getImages параллельно, вы можете использовать flatMap() с zip(), zip соберет 2 выброса от обоих Singles, и вы можете объединить 2 результата в новое значение, что означает, что вы можете отсортировать список видео MediaContent , и объедините его со списком изображений MediaContent и верните единый список (или любой другой объект, который вы хотите):

Single<List<MediaContent>> single =
            Single.fromCallable(() -> createDocument(url))
                    .flatMap(document -> Single.zip(singleGetVideo(document), SingleGetImage(document),
                            (videoMediaContents, imageMediaContents) -> //here you'll have the 2 results
                    //you can sort combine etc. and return unified object
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

    single.subscribe(parseImageSubscription)
person yosriz    schedule 02.03.2017

Observable.zip() может реализовать это идеально. По этому методу наблюдатель получит объединенный результат.

public void zip() {
    Observable<Integer> observable1 = Observable.just(1);
    Observable<Integer> observable2 = Observable.just(2);
    Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer + integer2;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer o) {
            Logger.i(o.toString());
            //Here will print 3.
        }
    });
}
person Eric    schedule 02.03.2017