Как решить проблему асинхронного подключения клиента paho mqtt?

Фон

Я играл с MQTT для проекта и столкнулся со странной проблемой. Я использую paho в качестве клиента MQTT и VerneMQ в качестве брокера.

Служба брокера VerneMQ запущена и работает, я могу подтвердить это, запустив netstat, и я вижу, что запись 127.0.0.1:1883 находится в режиме LISTENING.

Это мой код для клиента:

public class Producer implements MqttCallback {

    private String brokerUri;
    private String clientId;

    public Producer(String brokerUri, String clientId){
        this.brokerUri = brokerUri;
        this.clientId = clientId;
    }

    public void doProduce(String topic, String payload){
        MemoryPersistence memoryPersistence = new MemoryPersistence();

        try {
            MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttAsyncClient.setCallback(this);
            mqttAsyncClient.connect(mqttConnectOptions);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(payload.getBytes());
            mqttAsyncClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void connectionLost(Throwable throwable) {

    }


    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }


    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Message delivered!");
    }
}

Ниже приведен мой основной класс

public class Main {
    public static void main(String[] args) {
        Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
        producer.doProduce("dummyTopic", "dummyMessage");
    }
}

Проблема

Когда я запускаю свое приложение, я вижу исключение Client is not connected (32104) в выводе.

Если я изменю строку mqttAsyncClient.connect(mqttConnectOptions); на mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion(); в классе Producer, я смогу успешно подключиться к брокеру и увидеть Message delivered! в выводе.

Если не ошибаюсь, waitForCompletion() заблокирует вызов до получения ответа. И, добавив эту строку, я фактически изменил свое соединение AsyncClient на блокирующее соединение, что для меня нежелательно.

Вопрос

Как я могу решить эту проблему, чтобы клиент paho MQTT подключался к брокеру неблокирующим образом? Я что-то пропустил по пути?


person raidensan    schedule 05.10.2017    source источник


Ответы (1)


Это описано в документации для IMqttAsyncClient.

 IMqttToken token method(parms, Object userContext, IMqttActionListener callback)

В этой форме обратный вызов регистрируется вместе с методом. Обратный вызов будет уведомлен об успешном или неудачном выполнении действия. Обратный вызов вызывается в потоке, управляемом клиентом MQTT, поэтому важно, чтобы обработка в обратном вызове была сведена к минимуму. В противном случае работа клиента MQTT будет запрещена. Например, чтобы получить уведомление (обратный вызов) при завершении соединения:

IMqttToken conToken;
  conToken = asyncClient.connect("some context", new MqttAsyncActionListener() {
      public void onSuccess(IMqttToken asyncActionToken) {
        log("Connected");
      }

      public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        log ("connect failed" +exception);
      }
});

Необязательный объект контекста может быть передан в метод, который затем будет доступен в обратном вызове. Контекст сохраняется клиентом MQTT) в токене, который затем возвращается инициатору. Маркер предоставляется методам обратного вызова, где затем можно получить доступ к контексту.

Таким образом, ваш блок try/catch должен выглядеть так:

try {
  MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setCleanSession(true);
  mqttAsyncClient.setCallback(this);
  mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
    public void onSuccess(IMqttToken asyncActionToken) {
      MqttMessage mqttMessage = new MqttMessage();
      mqttMessage.setPayload(payload.getBytes());
      mqttAsyncClient.publish(topic, mqttMessage);
    }

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
      exception.printStackTrace();
    }
});

} catch (MqttException e) {
  e.printStackTrace();
}
person hardillb    schedule 05.10.2017