Есть ли ограничение на количество наблюдаемых в методе .zip?

Кажется, существует ограничение на количество Observables, которые можно использовать в качестве параметров в методе zip для Kotlin. Если это так, то какова лучшая альтернатива?

Например, когда я использую 9 параметров, все работает, как и ожидалось. Когда я добавляю 10-й параметр, я получаю сообщение об ошибке Невозможно определить тип этого параметра. Укажите это явно

Observable.zip(
            //TODO: parameterize exchange symbols based on pair
            methodOne() as Observable<Any>),
            methodTwo() as Observable<Any>),
            methodThree() as Observable<Any>),
            methodFour() as Observable<Any>),
            methodFive() as Observable<Any>),
            methodSix() as Observable<Any>),
            methodSeven() as Observable<Any>),
            methodEight() as Observable<Any>),
            methodNine() as Observable<Any>),
            { oneResult, twoResult, threeResult, fourResult, fiveResult, sixResult, sevenResult, eightResult, nineResult ->
                    //logic here applying computation to results
            })
            .subscribe(
                    {},
                    {
                        println(String.format("Error: %s", it.message))
                    })
            .unsubscribe()
}

person Adam Hurwitz    schedule 19.06.2018    source источник


Ответы (3)


RxJava поддерживает только до 9 различных источников с zip. Кроме того, вы должны использовать zip(Iterable<ObservableSource>, Func<Object[],R>) и привести каждый элемент Object[] к соответствующему типу.

Возвращает Observable, который выдает результаты указанной функции combiner, примененной к комбинациям элементов, испускаемых последовательно Iterable из других ObservableSource. zip применяет эту функцию в строгой последовательности, поэтому первый элемент, испускаемый новым ObservableSource, будет результатом применения функции к первому элементу, испускаемому каждым из исходных ObservableSource; второй элемент, испускаемый новым ObservableSource, будет результатом функции, примененной ко второму элементу, испускаемому каждым из этих ObservableSource; и так далее.

Результирующий ObservableSource<R>, возвращенный из zip, будет вызывать onNext столько раз, сколько onNext вызовов источника ObservableSource выдает наименьшее количество элементов.

Оператор подписывается на свои источники в том порядке, в котором они указаны, и с готовностью завершает работу, если один из источников короче остальных при удалении других источников. Поэтому вполне возможно, что эти другие источники никогда не смогут выполниться полностью (и, следовательно, не вызовут doOnComplete()). Это также может произойти, если источники имеют одинаковую длину; если источник A завершается, а B был использован и близок к завершению, оператор обнаруживает, что A не будет отправлять дальнейшие значения, и немедленно удаляет B. Например:

zip(Arrays.asList(range(1, 5).doOnComplete(action1), range(6, 5).doOnComplete(action2)), (a) -> a)

action1 будет вызван, а action2 — нет. Чтобы обойти это свойство завершения, используйте также doOnDispose(Action) или используйте using() для очистки в случае завершения или вызова dispose(). Примечание по сигнатуре метода: поскольку Java не позволяет создавать общий массив с новым T[], реализация этого оператора должна вместо этого создать Object[]. К сожалению, Function<Integer[], R>, переданный методу, вызовет ClassCastException.

person akarnokd    schedule 19.06.2018

Согласно Oracle Java Doc: http://reactivex.io/RxJava/javadoc/rx/Observable.html#zip(java.lang.Iterable,%20rx.functions.FuncN)

эти методы существуют:

static <R> Observable<R>    zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

static <R> Observable<R>    zip(Observable<?>[] ws, FuncN<? extends R> zipFunction)

static <R> Observable<R>    zip(Observable <? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

static <T1,T2,R> Observable<R>  zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1,? super T2,? extends R> zipFunction)

static <T1,T2,T3,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1,? super T2,? super T3,? extends R> zipFunction)

static <T1,T2,T3,T4,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1,? super T2,? super T3,? super T4,? extends R> zipFunction)

static <T1,T2,T3,T4,T5,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1,? super T2,? super T3,? super T4,? super T5,? extends R> zipFunction)


static <T1,T2,T3,T4,T5,T6,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Func6<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? extends R> zipFunction)

static <T1,T2,T3,T4,T5,T6,T7,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Func7<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? super T7,? extends R> zipFunction)

static <T1,T2,T3,T4,T5,T6,T7,T8,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8, Func8<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? super T7,? super T8,? extends R> zipFunction)

static <T1,T2,T3,T4,T5,T6,T7,T8,T9,R>
Observable<R>   zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8, Observable<? extends T9> o9, Func9<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? super T7,? super T8,? super T9,? extends R> zipFunction)

Таким образом, чтобы иметь более 9 элементов, используйте массив или итерацию, это будет проще

person Arnault Le Prévost-Corvellec    schedule 19.06.2018

Да, есть несколько перегруженных методов Observable.zip() и тот, у которого максимальное количество аргументов ObservableSource равно 9:

public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
        ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
        ObservableSource<? extends T4> source4, ObservableSource<? extends T5> source5, ObservableSource<? extends T6> source6,
        ObservableSource<? extends T7> source7, ObservableSource<? extends T8> source8, ObservableSource<? extends T9> source9,
        Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper) {

Если вы хотите иметь более 9 источников, взгляните на zipArray или zipIterable

person michalbrz    schedule 19.06.2018