Создание списка из двух наблюдаемых источников в RxKotlin/RxJava с использованием collectInto

У меня есть класс данных Category и класс данных Plan. Каждый Category имеет список идентификаторов планов. Категории и планы хранятся в комнате. Я пытаюсь создать локальный List<Any>, где я добавляю каждую категорию в список, а затем добавляю каждый из своих планов.

Итак, для каждой категории добавьте категорию в список, а затем добавьте каждый план, принадлежащий этой категории.

Окончательный результат будет выглядеть примерно так...

0 -> a Category
1 -> a Plan
2 -> a Plan
3 -> a Plan
4 -> a Category
5 -> a Plan

и т. д.

Следующие вызовы успешно возвращают Observable<List<Category>> и Observable<Plan>

AppDatabase
   .getDatabase(context)
   .categoryDao()
   .getAll()

AppDatabase.getDatabase(context).planDao().getPlan(planId)

Здесь я пытаюсь создать свой список, но на самом деле он никогда не излучается, когда я подписываюсь на него. Нет завершения или ошибка. Все остальное в потоке попадает под удар. Почему я не могу получить окончательный результат?

    fun fetchCategoriesAndPlans() {
    val items = mutableListOf<Any>()
    AppDatabase
        .getDatabase(context)
        .categoryDao()
        .getAll()
        .concatMap { listOfCategories ->
            listOfCategories.toObservable()
        }
        .doOnNext { category ->
            items.add(category)
        }
        .concatMap { category ->
            category.getPlanIds()!!.toObservable()
        }
        .flatMap { planId ->
            AppDatabase.getDatabase(context).planDao().getPlan(planId)
        }.collectInto(items, BiConsumer{ list, i ->
            Log.d(TAG, "Collect into")
            list.add(i)
        })
        .subscribeBy(
            onSuccess = {
                Log.d(TAG, "Got the list")
            },
            onError = {
                Log.e(TAG, "Couldn't build list ${it.message}", it)
            })
}

person Tyler Pfaff    schedule 16.07.2019    source источник
comment
Причина, по которой код не работает, заключается в том, что collectInto(). collectInto() ничего не излучает, пока исходный поток не завершится. .categoryDao().getAll() on Room возвращает бесконечный поток, который никогда не завершается. Данный ответ будет работать, потому что ответ использует Single вместо Observable.   -  person Sanlok Lee    schedule 16.07.2019
comment
val items = mutableListOf<Any>(); doOnNext { items.add(it) } не сработало бы.   -  person EpicPandaForce    schedule 16.07.2019


Ответы (1)


Я делаю демо-базу для вашего случая, которая помогает испускать как Category, так и Plan

override fun onCreate(savedInstanceState: Bundle?) {
    ...

    getCategories()
        .flattenAsObservable { it }
        .flatMap { getPlanWithCategory(it) }
        .toList()
        .subscribe({
            for (item in it) {
                Log.i("TAG", " " + item.javaClass.canonicalName)
            }
        }, {

        })
}

fun getPlanWithCategory(category: Category): Observable<Any> {
    val getPlansObservable = Observable.fromArray(category.planIds).flatMapIterable {
        it
    }.flatMap {
        getPlan(it).toObservable()
    }
    return Observable.concat(Observable.just(category), getPlansObservable)
}


fun getPlan(planId: String): Single<Plan> {
    return Single.just(Plan())
}

fun getCategories(): Single<List<Category>> {
    val categories = arrayListOf<Category>()
    categories.add(Category(arrayListOf("1", "2", "3")))
    categories.add(Category(arrayListOf("1", "2")))
    return Single.just(categories)
}

class Category(val planIds: List<String>)

class Plan

Выход

 I/TAG:  Category
 I/TAG:  Plan
 I/TAG:  Plan
 I/TAG:  Category
 I/TAG:  Plan
 I/TAG:  Plan

Надеюсь, это поможет

person Linh    schedule 16.07.2019
comment
Это сработало отлично, есть идеи, почему мой не работал? - person Tyler Pfaff; 16.07.2019
comment
Извини. Я не знаю, я пытаюсь взять ваш код и протестировать, чтобы найти проблему, но он всегда имеет комбинированную ошибку (и я не могу ее исправить), затем я делаю новый способ, основанный на моих знаниях. - person Linh; 16.07.2019