RxJava цепочка запросов плоской карты

я использую Retrofit с RxJAva для приложения, которое получает Rss-каналы, но rss не содержит всей информации, поэтому я использую jsoup для анализа каждой ссылки на элемент, для получения изображения и описания статьи. теперь я использую это так:

public Observable<Rss> getDumpData() {
    return newsAppService.getDumpData()
            .flatMap(rss -> Observable.from(rss.channel.items)
            .observeOn(Schedulers.io())
            .flatMap(Checked.f1(item -> Observable.just(Jsoup.connect(item.link).get())
            .observeOn(Schedulers.io())
            .map(document -> document.select("div[itemprop=image] > img").first())
                    .doOnNext(element -> item.image = element.attr("src"))
            )))
            .defaultIfEmpty(rss)
            .ignoreElements()
            .observeOn(Schedulers.io())
            .subscribeOn(AndroidSchedulers.mainThread());
}

и я получаю сообщение об ошибке в этой строке: defaultIfEmpty(rss) он не распознает rss плоской карты. и когда я перемещаю defaultIfEmpty(rss) в скобки плоской карты, у меня появляется другая ошибка, говорящая о том, что тип возвращаемого значения должен быть изменен на Element. есть ли у них какое-либо решение?


person Mamadou    schedule 19.10.2016    source источник
comment
Чего вы пытаетесь добиться, передавая параметр flatMap параметру defaultIfEmpty? Вы хотите поймать исключение?   -  person R. Zagórski    schedule 19.10.2016
comment
@R.Zagórski на всякий случай испускает источник Observable   -  person Mamadou    schedule 19.10.2016
comment
Ожидаете ли вы, что ошибка произойдет из-за newsAppService.getDumpData() или из-за вашего преобразования для получения изображения?   -  person R. Zagórski    schedule 19.10.2016
comment
да @ R.Zagórski я ожидаю этого от преобразования, которое может вызвать исключение   -  person Mamadou    schedule 19.10.2016


Ответы (2)


прежде всего вам нужно избавиться от параллелизма сObservOn и использовать subscribeOn.

.observeOn(Schedulers.io())

Если вы хотите синхронизировать данные из другого потока обратно в цикл обработки событий, рассмотрите возможность использованияObservOn с AndroidScheduler. Обычно перед подпиской на наблюдаемый объект следует использовать visibleOn, чтобы синхронизироваться с циклом пользовательского интерфейса и изменить информацию пользовательского интерфейса.

.observeOn(AndroidSchedulers.mainThread())

Во-вторых, не рекомендуется изменять объекты в конвейере. Вы должны вернуть новый объект очень вовремя.

.doOnNext(element -> item.image = element.attr("src"))

Я попытался реорганизовать ваше решение с учетом первых двух пунктов. Я использую RxJava2-RC5.

У оператора flatMap много перегрузок. Один из них предоставляет функцию для объединения входящего значения и созданного значения.

Observable<Rss> rssItemObservable = newsService.getDumpData()
                .flatMap(rss -> getRssItemInformation(rss).subscribeOn(Schedulers.io()),
                        (r, rItemList) -> {
                            Rss rInterim = new Rss();
                            rInterim.items = rItemList;
                            return rInterim;
                        });

Helping-метод для получения информации для каждого элемента в Rss. Рассмотрите возможность использования перегрузки с maxConcurrency, поскольку по умолчанию он будет подписываться на каждый поток сразу. Поэтому flatMap создаст много http-запросов.

private Observable<List<RssItem>> getRssItemInformation(Rss rss) {
        return Observable.fromIterable(rss.items)
                .flatMap(rssItem -> getImageUrl(rssItem).subscribeOn(Schedulers.io()), (rItem, img) -> {
                    RssItem item = new RssItem();
                    printCurrentThread("merge1");
                    item.image = img;
                    item.link = rItem.link;
                    return item;
                }).toList().toObservable();
}

Вспомогательный метод для получения URL-адреса изображения. Возврат наблюдаемого не влияет на параллелизм. В случае ошибки в качестве значения по умолчанию будет возвращена пустая строка.

private Observable<String> getImageUrl(String link) {
           return Observable.fromCallable(() -> Jsoup.connect(link).get())
                .map(document -> document.select("div[itemprop=image] > img").first())
                .map(element -> element.attr("src"))
                .onErrorResumeNext(throwable -> {
                    return Observable.just("");
                });
}

Полный пример можно посмотреть на github.gist: https://gist.github.com/anonymous/a8e36205fc2430517c66c802f6eef38e

person Hans Wurst    schedule 22.10.2016
comment
Ганс просто хотел сказать спасибо, этот ответ косвенно помог мне решить мою проблему в цепочке запросов. Я не вызывал subscribeOn(Schedulers.io() в цепочке запросов, я имею в виду, делая другие вызовы API в flatMap. - person Paresh Mayani; 09.03.2017

Нельзя смешивать внутренний параметр одного параметра RxJava (параметр flatMap lambda) с другим параметром оператора (defaultIfEmpty).

Прежде всего, создайте вспомогательную функцию, чтобы поддерживать чистоту основного реактивного потока:

private Observable<List<Item>> getDetails(List<Item> items) {
    return Observable.from(items)
               .observeOn(Schedulers.io())
               .flatMap(Checked.f1(item ->
                   Observable.zip(
                       Observable.just(item),
                            Observable.just(Jsoup.connect(item.link).get())
                           .observeOn(Schedulers.io())
                           .map(document -> document.select("div[itemprop=image] > img").first()),
                           (itemInner, element) -> {
                                itemInner.image = element.attr("src");
                                return itemInner;
                           }
                   )
               ))
               .toList();
}

Затем переформатируйте основную функцию:

newsAppService.getDumpData()
    .flatMap(rss ->
        Observable.zip(
            Observable.<Rss>just(rss),
            getDetails(rss.channel.items),
            (rssInner, items) -> {
                rssInner.channel.items = items;
                return rss;
            }).onErrorResumeNext((throwable -> Observable.just(rss))
        )
    )
    .observeOn(Schedulers.io())
    .subscribeOn(AndroidSchedulers.mainThread());

Надеюсь, я правильно прицелился. Это может не сработать, так как я не могу проверить это, однако я надеюсь, что вы поняли идею. Причина, по которой я использовал функции .zip, заключается в том, что вы не можете потерять ссылку на анализируемые в данный момент item или rss.

person R. Zagórski    schedule 19.10.2016
comment
Большое спасибо @R.Zagórski, это работает отлично, но я не очень хорошо понимаю код, я прочитаю документацию по zip, вы меня спасли - person Mamadou; 19.10.2016