RX: Запускать Zipped Observables параллельно?

Итак, я играю с RX (действительно круто), и я преобразовываю свой API, который обращается к базе данных sqlite в Android, чтобы возвращать наблюдаемые.

Поэтому, естественно, одна из проблем, которую я начал пытаться решить, звучит так: «Что, если я хочу сделать 3 вызова API, получить результаты, а затем выполнить некоторую обработку, когда все они будут завершены?»

Это заняло у меня час или два, но в конце концов я нашел Zip Function, и это помогло мне ловко:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Большой! Так что это круто.

Поэтому, когда я архивирую 3 наблюдаемых объекта, они запускаются последовательно. Что, если я хочу, чтобы все они работали параллельно одновременно, чтобы получить результаты быстрее? Я поэкспериментировал с несколькими вещами и даже попытался прочитать некоторые оригинальные материалы RX, написанные людьми. в С#. Я уверен, что есть простой ответ. Может кто-то указать мне верное направление? Каков правильный способ сделать это?


person spierce7    schedule 18.01.2014    source источник
comment
Если я могу спросить, почему вы ждете обработки, пока все три не будут завершены?   -  person Scott Baker    schedule 20.01.2014
comment
@ScottSEA, конечно, так что скажем, у меня есть экран, для правильного рисования которого требуется 3 разных элемента из SQLite или 3 разных элемента сетевой информации. Я хотел бы убедиться, что у меня есть все, прежде чем я нарисую экран.   -  person spierce7    schedule 20.01.2014
comment
То есть вы не ждете, пока последовательности будут завершены, вы ждете, пока не получите значение из каждой последовательности?   -  person Scott Baker    schedule 20.01.2014
comment
@ScottSEA, как я настроил свой API, так это то, что все возвращает только один объект, а затем завершается. Если вы считаете, что есть лучшие способы настроить это, я весь внимание. Я очень новичок в RX и хотел бы услышать все, что вы хотите сказать.   -  person spierce7    schedule 20.01.2014
comment
@ScottSEA, даже если я возвращаю несколько элементов, я бы просто отправил один список элементов вместо нескольких вызовов onNext.   -  person spierce7    schedule 20.01.2014


Ответы (3)


zip запускает наблюдаемые объекты параллельно, но также подписывается на них последовательно. Поскольку ваш getNumberedObservable завершается в методе подписки, создается впечатление последовательного выполнения, но на самом деле такого ограничения нет.

Вы можете либо попробовать использовать некоторые долго работающие Observable, которые пережили свою логику подписки, например timer, либо использовать метод subscribeOn для асинхронной подписки на каждый поток, переданный в zip.

person James World    schedule 18.01.2014
comment
Ах! Не могу поверить, что забыл использовать subscribeOn. Спасибо что подметил это. Я протестировал его, и он работает. Итак, теперь у меня остался вопрос: что, если бы я хотел, чтобы все они запускались последовательно, я бы просто подписался на них всех в одном потоке или есть лучший способ объединить все наблюдаемые вместе для их последовательного запуска? Спасибо! - person spierce7; 19.01.2014
comment
Если вы хотите запускать их последовательно, вы, вероятно, пришли не к тому API! :) Шутки в сторону, вы можете это сделать, но это неудобно. Если все потоки одного типа, вы можете связать их с помощью concat, если они все разные, вы можете создать тип для хранения n результатов, а затем использовать concat с select для проецирования результата каждого потока на его заполнитель в тип и используйте scan для накопления одного результата. Вы также можете просто подписаться на каждый последующий поток в onCompleted предыдущего. - person James World; 19.01.2014
comment
Вы также можете использовать selectMany для проецирования результата предыдущего потока в запрос для следующего. Это хорошо работает, если результат одного потока передается в следующий. - person James World; 19.01.2014
comment
Кроме того, подписка на них всех в одном потоке не гарантирует работу - метод подписки очень часто возвращается до завершения оператора. - person James World; 19.01.2014

В RxJava используйте toAsync, чтобы включить обычную функцию во что-то, что будет выполняться в потоке и возвращать результат в виде наблюдаемого объекта.

Я не очень хорошо знаю синтаксис Java, но это будет выглядеть примерно так:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

Это сработало бы, если бы getNumber действительно обращался к базе данных. Когда вы вызываете getNumberedObservable, он возвращает наблюдаемое, которое будет запускаться getNumber в отдельном потоке, когда вы подписываетесь на него.

person Brandon    schedule 20.01.2014

Я пытался сделать то же самое, запуская несколько потоков параллельно, используя zip. Я закончил открывать новый вопрос и получил ответ. По сути, вы должны подписать каждый наблюдаемый объект на новый поток, поэтому, если вы хотите запустить три наблюдаемых объекта параллельно с помощью zip, вам необходимо подписаться на 3 отдельных потока.

person s-hunter    schedule 07.07.2016