RxJava2: невозможно обработать исключение для асинхронного обратного вызова с помощью retryWhen

Я пытаюсь подключиться к брокеру MQTT. Я хочу повторить попытку, если мне не удастся подключиться. Я получаю обратный вызов при успешном или неудачном подключении.

Прочитав несколько примеров retryWhen и обработки асинхронных обратных вызовов, я собрал этот код. Он отлично работает, если мне удастся подключиться. Кроме того, он повторяет 3 попытки, если я вызываю e.onError(throwable) синхронно из Flowable. Но в моем приложении для Android происходит сбой, если я вызываю e.onError(throwable) из метода обратного вызова onFailure().

Вот код:

Цепочка RxJava

createConnectionFlowable(client, options)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .retryWhen(createRetryFunction())
    .subscribe(createConsumer());

создать Flowable

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<String>() {

        public void subscribe(final FlowableEmitter<String> e) throws Exception {
                client.connect(options).setActionCallback(new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
                    public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
                });
        }
    }, BackpressureStrategy.BUFFER);
}

Создать функцию повтора

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
    return new Function<Flowable<Throwable>, Publisher<?>>() {

        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.zipWith(
                    Flowable.range(1, 3),
                    new BiFunction<Throwable, Integer, Integer>() {
                        public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
                    }
            )
            .flatMap(new Function<Integer, Publisher<?>>() {
                public Publisher<?> apply(Integer integer) throws Exception {
                    return Flowable.timer(integer, TimeUnit.SECONDS);
                }
            });
        }
    };
}

Потребитель: делайте здесь все хорошее

private Consumer<String> createConsumer() {
    return new Consumer<String>() {
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: do important stuff here" + s);
        }
    };
}

Журналы ошибок

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:     at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:   ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
                                                                     Process: com.work.app, PID: 16769
                                                                     Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

Вопросы

  1. Почему этот код выдает исключение, которое приводит к сбою приложения? В идеале он должен обрабатывать исключение? Что мне здесь не хватает?
  2. Почему не повторяется 3 раза?
  3. Почему тот же код повторяется правильно, если я вызываю e.onError(throwable) синхронно из метода Flowable.subscribe()?

Ссылки

  1. RxJava 1.x retryWhen doc
  2. Этот блог

person Pravin Sonawane    schedule 20.12.2016    source источник


Ответы (2)


  1. Поскольку вы subscribe используете Consumer<String>, вы не определяете обработчик ошибок для потока. Это означает, что ошибка будет передана обработчику ошибок по умолчанию через RxJavaPlugins.getErrorHandler().handleError(...). На Android этот обработчик, похоже, вызывает фатальную ошибку. Чтобы исправить это, используйте Observer<String> вместо Consumer<String>
  2. Журнал, кажется, предполагает, что клиент потерпел неудачу 3 раза («onFailure» упоминается три раза), если Rx ничего не делает. Если бы мне пришлось предположить, что клиент может иметь состояние, это означает, что после первоначального подключения последующие вызовы к client.connect(...) демонстрируют некоторую форму странного поведения, вызывающего проблему. Поскольку журнал показывает error - 1 sec wait - error, error, я предполагаю, что обратные вызовы остаются активными, поэтому вторая ошибка дважды отправляется RxJava.
  3. Предполагая, что вы говорите о методе waitForCompletion(), когда говорите о синхронном, это подтвердит мои предположения в 2. Поскольку обратные вызовы не зарегистрированы, каждый throwable будет сообщаться только один раз, что исправляет поведение.

Я не уверен, почему эмиттер останется работоспособным после завершения (onError / onComplete), но поскольку спецификация требует, чтобы эти методы вызывались только после того, как это могло быть неопределенное поведение, вызывающее эту проблему.

person Kiskae    schedule 20.12.2016
comment
Я также пробовал Observer реализовать onError() в качестве подписчика. Кроме того, я считаю, что он не пытается повторить попытку, потому что он не печатает apply: delay retry by seconds:1 три раза, как если бы исключение было выбрано синхронно из Flowable Под синхронностью я имею в виду, что я не использую обратный вызов, чтобы проверить, могу ли я подключиться к брокеру MQTT. - person Pravin Sonawane; 20.12.2016
comment
Я думаю о следующей последовательности событий: onSubscribe - ›onConnect (1) -› onError (1) - ›timer (1) -› onSubscribe - ›onConnect (2) -› onError (1) - ›onError (2) ). По сути, исходный обратный вызов остается активным и вызывается при сбое второго соединения, в результате чего обработчику ошибок отправляются 3 объекта вызова. Если не использовать обратный вызов, повторение событий не может произойти. - person Kiskae; 20.12.2016
comment
Интересно .. Я изменил задержку каждого повтора с 1 секунды на 15 секунд, чтобы попытаться избежать перекрытия и имитировать что-то более близкое к синхронному вызову. У меня все еще та же ошибка. Есть идеи, как мы можем справиться с такими недетерминированными сценариями, когда мы не знаем, сколько времени займет обратный вызов? - person Pravin Sonawane; 20.12.2016
comment
Я не думаю, что в RxJava есть элегантное решение, поскольку похоже, что он касается мониторинга ресурса за пределами его области. Истинным решением было бы создать клиента в потоке и очистить его, когда он выходит из строя / отменяется. Однако, поскольку похоже, что вы используете клиент после завершения наблюдаемого, это не сработает. Вероятно, это тот случай, когда другие инструменты, такие как класс-оболочка и правильное использование Handler, создадут лучшее решение, чем может предоставить RxJava. - person Kiskae; 20.12.2016

Наконец-то это заработало!

Оказывается, проблема не в RxJava2, а в том, как Mqtt (Eclipse Paho) ​​запускает обратный вызов IMqttActionListener в основном потоке, даже если клиент был создан в другом потоке !!!.

Простое решение - дождаться подключения клиента к потоку, в котором он был создан. Код, указанный в вопросе, правильный, за исключением этого метода

@NonNull
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
    return Flowable.create(new FlowableOnSubscribe<Boolean>() {
        @Override
        public void subscribe(final FlowableEmitter<Boolean> e) throws Exception {
            IMqttToken connect = client.connect(options);
            connect.waitForCompletion(); //this is blocking and is what was required!!
            if (client.isConnected()) {
                e.onNext(true);
                e.onComplete();
            } else {
                e.onError(connect.getException());
            }

        }
    }, BackpressureStrategy.BUFFER);
}

Надеюсь, это поможет кому-то работать с этими библиотеками :)

person Pravin Sonawane    schedule 27.12.2016