Создание пользовательских операторов в RxJava2?

У меня проблемы с поиском примера того, как создать собственный оператор с помощью RxJava 2. Я рассмотрел несколько подходов:

  1. Использование Observable.create, а затем flatMaping на нем из наблюдаемого источника. Я могу заставить это работать, но это не совсем правильно. В итоге я создаю статическую функцию, которую предоставляю источником Observable, а затем - flatMap для источника. Затем в OnSubscribe я создаю экземпляр объекта, которому передаю эмиттер, который обрабатывает и управляет Observable / Emitter (поскольку это нетривиально, и я хочу, чтобы все было как можно более инкапсулировано).
  2. Создание ObservableOperator и передача его Observable.lift. Я не могу найти никаких примеров этого для RxJava 2. Мне пришлось отлаживать свой собственный пример, чтобы убедиться, что мое понимание восходящего и нисходящего потоков было правильным. Поскольку я не могу найти никаких примеров или документации по этому поводу для RxJava 2, я немного беспокоюсь, что могу случайно сделать что-то, чего я не должен.
  3. Создайте свой собственный Observable тип. Похоже, именно так работают базовые операторы, многие из которых расширяют AbstractObservableWithUpstream. Однако здесь много всего происходит, и кажется, что легко что-то упустить или сделать то, чего я не должен. Я не уверен, должен ли я придерживаться такого подхода или нет. Я прошел через мысленный процесс, и кажется, что он может довольно быстро стать волосатым.

Я собираюсь перейти к варианту №2, но подумал, что имеет смысл спросить, какой поддерживаемый метод для этого был в RxJava2, а также выяснить, есть ли для этого документация или примеры.


person spierce7    schedule 24.01.2017    source источник
comment
Для # 2 я подозреваю, что это тот же механизм, который используется для создания всех собственных операторов rx, например. _1 _, _ 2_ и т. Д. Итак, вы можете зайти на github и поискать исходный код для них, чтобы увидеть, как они реализованы.   -  person Luciano    schedule 25.01.2017
comment
@Luciano # 3 - вот как создаются все собственные операторы rx. Как я уже упоминал выше, глядя на исходный код, он очень быстро становится волосатым. Повсюду вызывается множество вспомогательных методов. RxAssembly, DisposableHelper, и они используются точно. Это определенно можно было бы сделать, но я бы просто скопировал формат. Я хотел бы убедиться, что понимаю, что происходит в моем коде.   -  person spierce7    schedule 25.01.2017


Ответы (2)


Написание операторов не рекомендуется для новичков, и многие желаемые шаблоны потоков могут быть достигнуты с помощью существующих операторов.

Вы смотрели вики RxJava о написании операторов для 2.x ? Предлагаю прочитать его сверху вниз.

  1. использование create() возможно, но большинство людей используют его для создания элементов List с циклом for-each, не осознавая, что Flowable.fromIterable это делает.
  2. Мы сохранили эту точку расширения, хотя операторы RxJava 2 сами не используют lift(). Если вы хотите избежать использования шаблона с вариантом 3, вы можете попробовать этот маршрут.
  3. Так реализованы операторы RxJava 2. AbstractObservableWithUpstream - это небольшое удобство и не обязательно для внешних разработчиков..
person akarnokd    schedule 01.02.2017

Это может вам помочь. Я реализую оператор RxJava2 для обработки ошибки APiError. Я пользовался лифтом.

См. Пример.

  public final class ApiClient implements ApiClientInterface {
    ...
      @NonNull
      @Override
      public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) {
          return myApiService.activate(email, emailData)
                  .lift(getApiErrorTransformer())
                  .subscribeOn(Schedulers.io());
      }

      private <T>ApiErrorOperator<T> getApiErrorTransformer() {
          return new ApiErrorOperator<>(gson, networkService);
      }

  }

И тогда вы можете найти собственного оператора

    public final class ApiErrorOperator<T> implements ObservableOperator<T, T> {
        private static final String TAG = "ApiErrorOperator";
        private final Gson gson;
        private final NetworkService networkService;

        public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) {
            this.gson = gson;
            this.networkService = networkService;
        }

        @Override
        public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
            return new Observer<T>() {
                @Override
                public void onSubscribe(Disposable d) {
                    observer.onSubscribe(d);
                }

                @Override
                public void onNext(T value) {
                    observer.onNext(value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError", e);

                if (e instanceof HttpException) {
                        try {
                            HttpException error = (HttpException) e;
                            Response response = error.response();
                            String errorBody = response.errorBody().string();

                            ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class);
                            ApiException exception = new ApiException(errorResponse, response);

                            observer.onError(exception);
                        } catch (IOException exception) {
                            observer.onError(exception);
                        }

                    } else if (!networkService.isNetworkAvailable()) {
                        observer.onError(new NetworkException(ErrorResponse.builder()
                                .setErrorCode("")
                                .setDescription("No Network Connection Error")
                                .build()));
                    } else {
                        observer.onError(e);
                    }
                }

                @Override
                public void onComplete() {
                    observer.onComplete();
                }
            };
        }
    }
person zkglr    schedule 08.06.2017