RxJava: метод unsubscribe() подписки не вызывается

В приведенном ниже коде мне нужно освободить некоторые ресурсы при отказе от подписки (где он регистрирует «освобождение»).

   Observable first = Observable.create(new Observable.OnSubscribe<Object>() {
                    @Override
                    public void call(Subscriber<? super Object> subscriber) {
                        subscriber.add(Subscriptions.create(() -> {
                            log(“release”);         
                        }));
                    }
                }).doOnUnsubscribe(() -> log(“first”));
   Observable second = Observable.create(…).doOnUnsubscribe(() -> log(“second”));
   Observable result = first.mergeWith(second).doOnUnsubscribe(() -> log(“result”));
   Subscription subscription = result.subscribe(…);
   //…
   subscription.unsubscribe();

Но он регистрирует только «результат». Похоже, отписка не распространяется на дочерние наблюдаемые объекты слияния. Так как же обрабатывать отмену подписки внутри Observable.OnSubscribe первого наблюдаемого?


person VasyaFromRussia    schedule 02.11.2015    source источник


Ответы (1)


В большинстве случаев вызов unsubscribe влияет только на активную последовательность и может не распространяться, если определенные последовательности завершены: операторы могут не хранить свои источники, чтобы избежать утечек памяти. Основная идея заключается в том, что операторы освобождают любые ресурсы, которыми они управляют, при завершении непосредственно перед или сразу после того, как они вызывают методы onError или onCompleted нижестоящего потока, но это несколько несовместимо с 1.x.

Если вы хотите убедиться, что ресурсы являются релизами, посмотрите на оператор using, который освободит ваш ресурс после прекращения или отказа от подписки:

Observable.using(
    () -> "resource", 
    r -> Observable.just(r), 
    r -> System.out.println("Releasing " + r))
.subscribe(System.out::println);
person akarnokd    schedule 02.11.2015