Фон
Я играл с MQTT для проекта и столкнулся со странной проблемой. Я использую paho
в качестве клиента MQTT и VerneMQ
в качестве брокера.
Служба брокера VerneMQ запущена и работает, я могу подтвердить это, запустив netstat
, и я вижу, что запись 127.0.0.1:1883
находится в режиме LISTENING
.
Это мой код для клиента:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
Ниже приведен мой основной класс
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
Проблема
Когда я запускаю свое приложение, я вижу исключение Client is not connected (32104)
в выводе.
Если я изменю строку mqttAsyncClient.connect(mqttConnectOptions);
на mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();
в классе Producer
, я смогу успешно подключиться к брокеру и увидеть Message delivered!
в выводе.
Если не ошибаюсь, waitForCompletion()
заблокирует вызов до получения ответа. И, добавив эту строку, я фактически изменил свое соединение AsyncClient на блокирующее соединение, что для меня нежелательно.
Вопрос
Как я могу решить эту проблему, чтобы клиент paho MQTT подключался к брокеру неблокирующим образом? Я что-то пропустил по пути?