Ограничение действий с помощью оператора rxJava и retryWhen

В общем, мое приложение должно выполнять две задачи:

  • Одновременно принимать только один сетевой запрос
  • Повторить, если запрос не удался

Вот как я это реализую:

public class RequestsLocker {

    private volatile boolean isLocked;

    public <T> Observable.Transformer<T, T> applyLocker() {
        if(!isLocked()) {
            return observable -> observable
                    .doOnSubscribe(() -> {
                        lockChannel();
                    })
                    .doOnUnsubscribe(() -> {
                        freeChannel();
                    });
        } else {
            return observable -> Observable.error(new ChannelBusyException("Channel is busy now."));
        }
    }

    private void lockChannel() {
        isLocked = true;
    }

    private void freeChannel() {
        isLocked = false;
    }

    public boolean isLocked() {
        return isLocked;
    }

}

Выглядит хорошо.

Теперь моя retryWhen реализация:

public static Observable<?> retryWhenAnyIoExceptionWithDelay(Observable<? extends Throwable> observable) {
    return observable.flatMap(error -> {
        // For IOExceptions, we  retry
        if (error instanceof IOException) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }

        // For anything else, don't retry
        return Observable.error(error);
    });
}

Вот как я его использую:

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes))
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
}

...

public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .subscribe(subscriber);
}

Основная проблема в том, что doOnUnsubscribe() вызвал любую ошибку, а затем шкафчик открыт для любого нового запроса, пока не истечет время таймера и не произойдет повторная подписка. Это проблема. Пока таймер тикает, пользователь может сделать еще один запрос.

Как я могу это исправить?


person Alexandr    schedule 10.07.2016    source источник
comment
Не могли бы вы опубликовать код, показывающий, как вы на самом деле используете трансформатор applyLocker и retryWhenAnyIoExceptionWithDelay?   -  person JohnWowUs    schedule 11.07.2016


Ответы (3)


Проблема в том, что вы применяете свой трансформатор к наблюдаемому источнику, то есть до вашего retrywhen. Когда возникает ошибка, вы всегда собираетесь отказаться от подписки, а затем повторно подписаться на наблюдаемый источник, что приведет к вызову вашего doOnUnsubscribe.

Предлагаю вам попробовать

public Observable<List<QueueCarItem>> finishService(int id, PaymentType paymentType, String notes) {
    return carsQueueApi.finishService(id, new FinishCarServiceRequest(paymentType.getName(), notes));            
}


public void finishCarService(QueueCarItem carItem, PaymentType paymentType,
                             String notes, Subscriber<List<QueueCarItem>> subscriber) {
    queueApiMediator.finishService(carItem.getId(), paymentType, notes)
            .subscribeOn(ioScheduler)
            .observeOn(uiScheduler)
            .doOnError(this::handleError)
            .retryWhen(RxOperatorsHelpers::retryWhenAnyIoExceptionWithDelay)
            .compose(requestsLocker.applyLocker(RequestsLocker.RequestChannel.CHANGE));
            .subscribe(subscriber);
}

PS: Трансформатор шкафчика применения выглядит немного иначе, то есть он не принимает аргументов в коде, который вы связали.

person JohnWowUs    schedule 11.07.2016
comment
Да, я понял. Вот в чем проблема: решил попробовать CleanAcritecture на android10. Итак, метод RequetsLocker и finishService находится на data layer. Когда finishCarService метод в домене. Теперь я думаю о переносе оператора retryWhen в data layer. Но это нарушает философию CA, поскольку data layer должен решать, ЧТО делать, а не КАК. Другой способ - пусть RequestLocker на domain layer. В этом случае я должен уведомить domain layer об источнике данных, поскольку RequestLocker должен применяться только к сетевому запросу, а не к getFromCache. Это тоже плохая практика. Мех .. - person Alexandr; 11.07.2016

Использование retryWhen, чтобы избежать отказа от подписки onError, вы должны использовать onErrorResumeNext, который не отменит вашу подписку.

Взгляните на этот пример

/**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

Что касается параллелизма, если вы выполняете операцию в операторе flatMap, вы можете указать Max concurrent.

public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
    if (getClass() == ScalarSynchronousObservable.class) {
        return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
    }
    return merge(map(func), maxConcurrent);
}

Вы можете увидеть больше примеров здесь https://github.com/politrons/reactive

person paul    schedule 11.07.2016
comment
благодаря. Я не могу окончательно понять ваш код прямо сейчас, но я протестирую его через несколько дней. - person Alexandr; 11.07.2016

Мое текущее решение - не разблокировать RequestLocker на IoException, так как в этом случае запрос будет повторяться после задержки.

public <T> Observable.Transformer<T, T> applyLocker() {
    if(!isLocked()) {
        return observable -> observable.doOnSubscribe(() -> {
            lockChannel();
        }).doOnNext(obj -> {
            freeChannel();
        }).doOnError(throwable -> {
            if(throwable instanceof IOException) {
                return; // as any request will be repeated in case of IOException
            }
            freeChannel(channel);
        });
    } else {
        return observable -> Observable.error(new ChannelBusyException("Channel is busy now"));
    }
}
person Alexandr    schedule 11.07.2016