RxJava: почему одни и те же преобразования пересчитываются для каждой ветки наблюдаемых?

Введение

Рассмотрим простой фрагмент кода Java. Он определяет две наблюдаемые a и b в терминах c, который сам определяется с помощью d (a, b, c, d имеют тип Observable<Integer>):

    d = Observable.range(1, 10);
    c = d.map(t -> t + 1);
    a = c.map(t -> t + 2);
    b = c.map(t -> t + 3);

Этот код можно визуализировать с помощью диаграммы, где каждая стрелка (->) представляет преобразование (метод карты):

             .--> a
   d --> c --|
             '--> b

Если несколько цепочек наблюдаемых имеют свою часть, то (теоретически) новые значения общей части можно вычислить только один раз. В приведенном выше примере: каждое новое значение d может быть преобразовано в d --> c только один раз и использоваться как для a, так и для b.

Вопрос

На практике я наблюдаю, что трансформация рассчитывается для каждой цепочки, где используется эта трансформация (теста). Другими словами, приведенный выше пример должен быть правильно нарисован следующим образом:

   d --> c --> a
   d --> c --> b

В случае ресурсоемких преобразований новая подписка в конце цепочки приведет к вычислению всей цепочки (и снижению производительности).

Есть ли правильный способ заставить результат преобразования кэшироваться и вычисляться только один раз?

Мои исследования

Я нашел два решения этой проблемы:

  1. Передайте уникальные идентификаторы вместе со значениями и сохраните результаты преобразования во внешнем хранилище (внешнем по отношению к библиотеке rx).
  2. Используйте тему для реализации функции, похожей на карту, которая скрывает начало цепочки наблюдаемых. Код MapOnce; test.

Оба работают. Второй простой, но пахнет хаком.


person Vasiliy Kevroletin    schedule 26.11.2014    source источник


Ответы (2)


Вы определили горячие и холодные наблюдаемые.

Observable.range возвращает холодный наблюдаемый объект, хотя вы описываете результирующие запросы в иерархии, как будто они горячие; то есть, как если бы они разделяли побочные эффекты подписки. Они не. Каждый раз, когда вы подписываетесь на наблюдаемый объект cold, это может вызвать побочные эффекты. В вашем случае каждый раз, когда вы подписываетесь на range (или на запросы, установленные на range), он генерирует диапазон значений.

Во втором пункте вашего исследования вы определили, как преобразовать холодную наблюдаемую в горячую наблюдаемую; а именно, используя Субъекты. (Хотя в .NET вы не используете Subject<T> напрямую; вместо этого вы должны использовать такой оператор, как Publish. Я подозреваю, что в RxJava есть аналогичный оператор, и я бы рекомендовал его использовать.)

Дополнительная информация

Определение горячего в моей интерпретации, как подробно описано в моем сообщении в блоге, указанном выше, - это когда наблюдаемый объект не вызывает никаких побочных эффектов подписки. (Обратите внимание, что горячий наблюдаемый может многоадресно передавать побочные эффекты соединения при преобразовании из холодного в горячий, но температура относится только к к склонности наблюдаемого объекта вызывать побочные эффекты подписки, потому что это все, о чем мы действительно заботимся, когда говорим о температуре наблюдаемого на практике.)

Оператор map (Select в .NET, упомянутый в заключении моего сообщения в блоге) возвращает наблюдаемое, которое наследует температуру своего источника, поэтому на нижней диаграмме c, a и b равны холодно, потому что d холодно. Если гипотетически вы примените publish к d, то c, a и b унаследуют температуру hot от опубликованного наблюдаемого, а это означает, что подписка на них не вызовет побочных эффектов подписки. Таким образом, публикация d преобразует холодную наблюдаемую, а именно range, в горячую наблюдаемую.

    .--> c --> a
d --|
    .--> c --> b

Однако ваш вопрос был о том, как разделить вычисление c, а также d. Даже если вы опубликуете d, c все равно будет пересчитываться как для a, так и для b для каждого уведомления от d. Вместо этого вы хотите поделиться результатами c между a и b. Я называю наблюдаемую, в которой вы хотите поделиться побочными эффектами вычислений, "активной". (Я позаимствовал этот термин из терминологии пассивный и активный, используемый в нейробиологии для описания электрохимических токов в нейронах.)

На вашей верхней диаграмме вы считаете c активным, потому что это вызывает значительные побочные эффекты вычислений, согласно вашей собственной интерпретации. Обратите внимание, что c активен независимо от температуры d. Чтобы поделиться побочными эффектами вычисления активного наблюдаемого объекта, как ни странно, вы должны использовать publish точно так же, как и для холодного наблюдаемого объекта. Это связано с тем, что технически активные вычисления являются побочными эффектами в том же смысле, что и холодные наблюдаемые, в то время как пассивные вычисления не имеют побочных эффектов, как горячие наблюдаемые. Я ограничил термины горячий и холодный, чтобы они относились только к побочным эффектам начальных вычислений, которые я называю побочными эффектами подписки< /em>, потому что люди обычно используют их именно так. Я ввел новые термины, активный и пассивный, для обозначения побочных эффектов вычислений отдельно от побочных эффектов подписки.

В результате эти термины на практике просто интуитивно смешиваются друг с другом. Если вы хотите поделиться побочными эффектами вычисления c, просто publish вместо d. При этом a и b неявно становятся горячими, потому что map наследует побочные эффекты подписки, как было сказано ранее. Таким образом, вы фактически делаете правую часть наблюдаемого горячей, публикуя либо d, либо c, но публикация c также имеет свои побочные эффекты вычислений.

Если вы публикуете c вместо d, то d остается холодным, но это не имеет значения, поскольку c скрывает d от a и b. Таким образом, публикуя c, вы также фактически публикуете d. Следовательно, применение publish в любом месте вашего наблюдаемого делает правую часть наблюдаемого эффективно горячей. Неважно, где вы вводите publish или сколько наблюдателей или конвейеров вы создаете с правой стороны наблюдаемого. Однако выбор публикации c вместо d также разделяет побочные эффекты вычислений c, что технически завершает ответ на ваш вопрос. КЭД

person Dave Sexton    schedule 26.11.2014
comment
Я не думал, что это вообще связано с Hot или Cold, а просто с тем, что Rx — это модель отложенного исполнения. Он создает цепочку вычислений, которая независимо подключается для каждой подписки, и вы должны использовать Publish или Subject для общих частей вычислений. Я ошибся? - person Enigmativity; 27.11.2014
comment
@Enigmativity Ленивость монады ортогональна температуре наблюдаемого. Горячие и холодные наблюдаемые объекты являются ленивыми в Rx; например, формирование запроса к наблюдаемому объекту mouseMoves (hot) не вызывает никаких побочных эффектов, пока вы не вызовете Subscribe. Publish делает cold наблюдаемым hot, который эффективно разделяет побочные эффекты подписки. Прочтите мою статью, на которую я ссылался в ответе, чтобы узнать, почему температура наблюдаемого зависит от его склонности вызывать побочные эффекты подписки. - person Dave Sexton; 27.11.2014
comment
Я доволен этим, но я не думаю, что это меняет вопрос о том, почему существуют два отдельных конвейера подписки. Неважно, горячие они или холодные. - person Enigmativity; 27.11.2014
comment
Указание температур наблюдаемых говорит именно то, что показано на диаграммах. Верхняя диаграмма горячая, а нижняя диаграмма холодная. Подумайте вот о чем: на верхней диаграмме какой наблюдатель (a или b) заставляет d начинать отправку значений при подписке? Очевидно, что ни то, ни другое (если только вы не предполагаете поведение RefCount, чего мы здесь не будем для простоты). Другими словами, ни a, ни b не вызывают никаких побочных эффектов подписки. Это точно определение hot. Только когда c в конечном итоге подключается, возникают побочные эффекты подписки, и они передаются многоадресно на a и b. - person Dave Sexton; 27.11.2014
comment
[... Недостаточно места для комментариев] Итак, ОП хочет взять cold наблюдаемый объект, а именно range, и сделать его hot, чтобы поделиться побочными эффектами подписки. Тот факт, что в правой части обеих диаграмм есть два отдельных конвейера подписки, имеет значение только в том смысле, что OP хочет, чтобы они оба использовали одну подписку на d. Таким образом, проблема заключается просто в том, как поделиться побочными эффектами подписки в левой части диаграммы; то есть, как преобразовать наблюдаемую cold в наблюдаемую hot. Именно это и делает Publish. - person Dave Sexton; 27.11.2014
comment
Метод publish мне помог, спасибо. Точка соединения — хорошая метафора. Что касается наблюдаемой температуры: чтобы исправить мой пример, мне нужно опубликовать c. Если я сделаю d популярным, опубликовав, то будет транслироваться только d. c не будет транслироваться, но будет пересчитан из d 2 раза для a и b. - person Vasiliy Kevroletin; 27.11.2014
comment
Ближе к концу моего сообщения в блоге (ссылка в моем ответе), перед разделом заключения, я определил 2 термина для обозначения любых побочных эффектов вычислений, которые могут возникнуть после побочных эффектов начальных вычислений: пассивный или < я>активный. Если вы считаете c пассивным, вам не нужно его publish; в противном случае он активен и, возможно, его стоит опубликовать. Технически термины горячий и холодный также могут охватывать эти случаи, но мне показалось, что это слишком запутанно и не очень полезно. Лучше думать о температуре (как и большинство) только о побочных эффектах подписки; т. е. начальное вычисление. - person Dave Sexton; 27.11.2014
comment
[...продолжение] Таким образом, в приведенном выше примере, который публикует c, а не d, вы фактически подразумеваете, что c активен. Но это не меняет того факта, что вы хотите преобразовать d из холодной наблюдаемой в горячую наблюдаемую с помощью publish. Вы просто одновременно конвертируете c из холодного в горячего. Независимо от того, как вы его обрежете, этот вопрос просто о том, как сделать холодное наблюдаемым горячим, даже в общем смысле. Количество наблюдателей и точка, в которой происходит преобразование, являются просто дополнительными факторами, ортогональными основной проблеме. - person Dave Sexton; 27.11.2014
comment
Большинство горячих наблюдаемых во времени описываются в литературе как потоки событий в реальном времени. Я ожидал, что применение map к наблюдаемым в реальном времени снова даст наблюдаемые в реальном времени. Но оказывается, что это полуправда. @DaveSexton, не могли бы вы отредактировать свой ответ и добавить несколько строк о пассивных/активных наблюдаемых. Тогда я отмечу вопрос как решенный. Спасибо за ваши ответы. - person Vasiliy Kevroletin; 28.11.2014
comment
Ну, реальное время подразумевает постоянное выполнение, что является признаком того, что вы горячи. Настоящее определение hot, согласно моей интерпретации, подробно описанной в моем сообщении в блоге, - это когда наблюдаемый объект не вызывает никаких побочных эффектов подписки (хотя вместо этого он может вызывать побочные эффекты многоадресного соединения). Оператор map (Select в .NET) возвращает наблюдаемое значение, которое наследует температуру своего источника, поэтому оно по-прежнему горячее, и вы по-прежнему будете получать уведомления в режиме реального времени, пока нет обратного давления. Я ввел термин активный только для того, чтобы обозначить побочные эффекты дорогостоящих вычислений. - person Dave Sexton; 28.11.2014
comment
Я отредактирую свой пост, чтобы включить эту информацию по вашему запросу. - person Dave Sexton; 28.11.2014

Observable лениво выполняется каждый раз, когда на него подписываются (либо явно, либо неявно через композицию).

Этот код показывает, как источник излучает для a, b и c:

    Observable<Integer> d = Observable.range(1, 10)
            .doOnNext(i -> System.out.println("Emitted from source: " + i));
    Observable<Integer> c = d.map(t -> t + 1);
    Observable<Integer> a = c.map(t -> t + 2);
    Observable<Integer> b = c.map(t -> t + 3);

    a.forEach(i -> System.out.println("a: " + i));
    b.forEach(i -> System.out.println("b: " + i));
    c.forEach(i -> System.out.println("c: " + i));

Если вы в порядке с буферизацией (кешированием) результата, то это так же просто, как использовать оператор .cache() для достижения этого.

    Observable<Integer> d = Observable.range(1, 10)
            .doOnNext(i -> System.out.println("Emitted from source: " + i))
            .cache();

    Observable<Integer> c = d.map(t -> t + 1);
    Observable<Integer> a = c.map(t -> t + 2);
    Observable<Integer> b = c.map(t -> t + 3);

    a.forEach(i -> System.out.println("a: " + i));
    b.forEach(i -> System.out.println("b: " + i));
    c.forEach(i -> System.out.println("c: " + i));

Добавление .cache() к источнику делает его испускаемым только один раз и может быть подписано много раз.

Для больших или бесконечных источников данных кэширование не подходит, поэтому многоадресная рассылка является решением, гарантирующим, что источник передает только один раз.

Операторы publish() и share() — хорошее место для начала, но для простоты и, поскольку это синхронный пример, я покажу перегрузку publish(function), которую часто проще всего использовать.

    Observable<Integer> d = Observable.range(1, 10)
            .doOnNext(i -> System.out.println("Emitted from source: " + i))
            .publish(oi -> {
                Observable<Integer> c = oi.map(t -> t + 1);
                Observable<Integer> a = c.map(t -> t + 2);
                Observable<Integer> b = c.map(t -> t + 3);

                return Observable.merge(a, b, c);
            });

    d.forEach(System.out::println);

Если a, b, c нужны по отдельности, мы можем подключить все и «подключить» источник, когда он будет готов:

private static void publishWithConnect() {
    ConnectableObservable<Integer> d = Observable.range(1, 10)
            .doOnNext(i -> System.out.println("Emitted from source: " + i))
            .publish();

    Observable<Integer> c = d.map(t -> t + 1);
    Observable<Integer> a = c.map(t -> t + 2);
    Observable<Integer> b = c.map(t -> t + 3);

    a.forEach(i -> System.out.println("a: " + i));
    b.forEach(i -> System.out.println("b: " + i));
    c.forEach(i -> System.out.println("c: " + i));

    // now that we've wired up everything we can connect the source
    d.connect();
}

Или, если источник асинхронный, мы можем использовать refCounting:

    Observable<Integer> d = Observable.range(1, 10)
            .doOnNext(i -> System.out.println("Emitted from source: " + i))
            .subscribeOn(Schedulers.computation())
            .share();

Однако refCount (share — это перегрузка для его предоставления) допускает условия гонки, поэтому не гарантирует, что все подписчики получат первые значения. Обычно это требуется только для «горячих» потоков, на которые приходят и уходят подписчики. Для «холодного» источника, который мы хотим обеспечить всем, предпочтительным подходом являются предыдущие решения с cache() или publish()/publish(function).

Вы можете узнать больше здесь: https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators

person benjchristensen    schedule 29.11.2014