Я пытаюсь подключиться к брокеру MQTT. Я хочу повторить попытку, если мне не удастся подключиться. Я получаю обратный вызов при успешном или неудачном подключении.
Прочитав несколько примеров retryWhen и обработки асинхронных обратных вызовов, я собрал этот код. Он отлично работает, если мне удастся подключиться. Кроме того, он повторяет 3 попытки, если я вызываю e.onError(throwable)
синхронно из Flowable
. Но в моем приложении для Android происходит сбой, если я вызываю e.onError(throwable)
из метода обратного вызова onFailure()
.
Вот код:
Цепочка RxJava
createConnectionFlowable(client, options)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(createRetryFunction())
.subscribe(createConsumer());
создать Flowable
private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
return Flowable.create(new FlowableOnSubscribe<String>() {
public void subscribe(final FlowableEmitter<String> e) throws Exception {
client.connect(options).setActionCallback(new IMqttActionListener() {
public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
});
}
}, BackpressureStrategy.BUFFER);
}
Создать функцию повтора
private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
return new Function<Flowable<Throwable>, Publisher<?>>() {
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.zipWith(
Flowable.range(1, 3),
new BiFunction<Throwable, Integer, Integer>() {
public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
}
)
.flatMap(new Function<Integer, Publisher<?>>() {
public Publisher<?> apply(Integer integer) throws Exception {
return Flowable.timer(integer, TimeUnit.SECONDS);
}
});
}
};
}
Потребитель: делайте здесь все хорошее
private Consumer<String> createConsumer() {
return new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, "accept: do important stuff here" + s);
}
};
}
Журналы ошибок
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.work.app, PID: 16769
Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms:
Вопросы
- Почему этот код выдает исключение, которое приводит к сбою приложения? В идеале он должен обрабатывать исключение? Что мне здесь не хватает?
- Почему не повторяется 3 раза?
- Почему тот же код повторяется правильно, если я вызываю
e.onError(throwable)
синхронно из методаFlowable.subscribe()
?
Ссылки