Прервать одно наблюдаемое в concatMap

Я использую concatMap для обработки потока элементов по одному с длительной операцией. В какой-то момент мне нужно «прервать» эту длительную операцию, но только для текущего элемента:

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    // Now I need to interrupt whichever longRunningOperation in
    // progress, but I don't want to interrupt the whole stream.
    // In other words, I want to force it to move onto the next
    // integer.
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer));
}

Я попытался решить эту проблему, сохранив одноразовый «внутренний» поток и избавившись от него в нужное время. Но это не сработало. Внутренний поток удаляется, но concatMap никогда не переходит к элементу обработки 3. Тест просто зависает (поскольку внешний наблюдаемый никогда не завершается/завершается/удаляется)

Disposable disposable = Disposables.empty();

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    disposable.dispose();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    return Observable
            .just("\tStart working on " + integer,
                    "\tStill working on " + integer,
                    "\tAlmost done working on " + integer)

            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("\tfinally for " + integer))
            .doOnSubscribe(disposable -> {
                // save disposable so we can "interrupt" later
                System.out.println("Saving disposable for " + integer);
                Example.this.disposable = disposable;
            });
}

Даже если бы это сработало, это казалось немного хакерским, полагаясь на побочный эффект. Каков наилучший способ добиться этого?


person tir38    schedule 07.04.2019    source источник


Ответы (2)


У меня был почти тот же вопрос, что и Как отменить отдельный сетевой запрос при модернизации с помощью RxJava?. Я могу использовать PublishSubject, чтобы "прервать"

private PublishSubject interrupter;

@Test
public void main() throws InterruptedException {
    TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
            .concatMap(this::doLongRunningOperation)
            .test();

    Thread.sleep(10000);
    System.out.println("interrupt NOW");
    interrupter.onComplete();

    test.awaitTerminalEvent();
    System.out.println("terminal event");
}

Observable<String> doLongRunningOperation(final Integer integer) {
    interrupter = PublishSubject.create();

    return Observable
            .just("Start working on " + integer,
                    "Still working on " + integer,
                    "Almost done working on " + integer)
            // delay each item by 2 seconds
            .concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
            .doOnNext(System.out::println)
            .doFinally(() -> System.out.println("Finally for " + integer))
            .takeUntil(interrupter);
}
person tir38    schedule 13.05.2019

Вам просто нужно реализовать Observable типа семафора, который может запускать стоп-события.

Реализуйте этот семафор, например:

PublishSubject<Boolean> semaphore = PublishSubject.create()

Измените Observable, созданный в .concatMap(), следующим образом:

.concatMap(string ->
                        Observable.just(string)
                                .delay(2, TimeUnit.SECONDS)
                                .takeUntil(semaphore)) //Here is the stop trigger

Когда вам нужно отменить текущий элемент обработки - просто вызовите:

   semaphore.onNext(true);
person Vadim    schedule 27.06.2019
comment
Чем это отличается от моего ответа? Мне искренне любопытно. - person tir38; 13.11.2019