MQTT: отключение от комаров

Я использую брокера mosquitto, размещенного на AWS. Мой клиент подключился к брокеру, но через некоторое время отключился. Такое бывает много раз.

Я передаю QOS = 2, keep = true, clean session = false, no username and password, lwt = none, port = 1883, ip = server hosted on aws, client id = timeStamp (всегда уникально)

вот мой код -

import time
import paho.mqtt.client as mqtt

class MqttCommunication(object):

    def __init__(self):

        self.current_module = "Mqtt Communication class"
        self.clientID = clientId
        self.name = thread_name

        self.DEBUG = True
        self.MQTT_HOST = ""
        self.MQTT_PORT = 1883
        self.MQTT_USERNAME = ""
        self.MQTT_PASSWORD = ""
        self.MQTT_CLIENT_ID = self.clientID
        self.MQTT_TOPIC = ""
        self.MQTT_QOS = 0
        self.MQTT_RETAIN = None
        self.MQTT_CLEAN_SESSION = None
        self.MQTT_LWT = ""

        self.client = mqtt.Client(self.MQTT_CLIENT_ID,clean_session=self.MQTT_CLEAN_SESSION)
        self.on_connect = None
        self.on_disconnect = None
        self.on_message = None
        self.on_subscribe = None
        self.on_unsubscribe = None
        self.on_publish = None
        self.client.on_connect = self.mqtt_on_connect
        #self.client.on_message = self.mqtt_on_message
        self.client.on_disconnect = self.mqtt_on_disconnect
        self.client.on_subscribe = self.mqtt_on_subscribe
        self.client.on_unsubscribe = self.mqtt_on_unsubscribe
        self.client.on_publish = self.mqtt_on_publish



    def connectHost(self,mqtt_host,mqtt_port,mqtt_username,mqtt_password):
        self.MQTT_USERNAME = mqtt_username
        self.MQTT_PASSWORD = mqtt_password
        self.MQTT_HOST = mqtt_host
        self.MQTT_PORT = mqtt_port

        try:
            self.client.username_pw_set(self.MQTT_USERNAME, self.MQTT_PASSWORD)
            self.client.connect(self.MQTT_HOST,self.MQTT_PORT,60)
            print self.MQTT_HOST
            self.client.loop_start()

        except Exception, e:
            print "Error connecting to %s:%d: %s" % (mqtt_host, mqtt_port, str(e))

        return True

    def disconnectHost(self):
        self.client.disconnect()
        return True

    def mqttSettings(self,qos,mqtt_retain,mqtt_clean_session,mqtt_lwt):
        self.MQTT_QOS = qos
        self.MQTT_RETAIN = mqtt_retain
        self.MQTT_CLEAN_SESSION = mqtt_clean_session
        self.MQTT_LWT = mqtt_lwt
        return True

    def subscribeTopic(self,topic):
        self.MQTT_TOPIC = topic
        self.client.subscribe(self.MQTT_TOPIC, qos=self.MQTT_QOS)
        return True

    def unsubscribeTopic(self,topic):
        self.client.unsubscribe(self.MQTT_TOPIC)
        return True

    def setClientId(self,clientID):
        self.MQTT_CLIENT_ID= clientID
        return True

    def getClientId(self):
        return self.MQTT_CLIENT_ID

    def publishData(self,topic,message,qos):
        self.client.publish(topic,message,qos)
        return True


    # The callback for when the client receives a CONNACK response from the server.
    def mqtt_on_connect(self,client, userdata, flags, rc):
        if rc == 0:
            print "Connected to %s:%s" % (self.MQTT_HOST, self.MQTT_PORT)
            time.sleep(3)

        elif rc == 1:
            print "Connection refused - unacceptable protocol version"
        elif rc == 2:
            print "Connection refused - identifier rejected"
        elif rc == 3:
            print "Connection refused - server unavailable"
        elif rc == 4:
            print "Connection refused - bad user name or password"
        elif rc == 5:
            print "Connection refused - not authorised"
        else:
            print "Connection failed - result code %d" % (rc)



    # The callback for when a PUBLISH message is received from the server.
    def mqtt_on_message(self , client, userdata, msg):
        #print msg
        print(msg.topic+" : "+str(msg.payload))

    def mqtt_on_disconnect(self, client, userdata, rc):
        if rc != 0:
            print("Unexpected disconnection.")
        else:
            print('hello from disconnect')

    def mqtt_on_publish(self, client, userdata, mid):
        """
        What to do when a message is published
        """
        #print "publish"


    def mqtt_on_subscribe(self,client, userdata, mid, granted_qos):
        """
        What to do in the event of subscribing to a topic"
        """
        #logging.debug("Subscribe with mid " + str(mid) + " received.")


    def mqtt_on_unsubscribe(self, client, userdata, mid):
        """
        What to do in the event of unsubscribing from a topic
        """
        #logging.debug("Unsubscribe with mid " + str(mid) + " received.")

совместное использование журналов ---

1483617475: New connection from xx.xx.xx.xx on port 1883.
1483617475: New client connected from xx.xx.xx.xx as mqttjs_bd875699 (c1, k10$
1483617718: Saving in-memory database to /var/lib/mosquitto/mosquitto.db.
1483618131: Client 2017-01-05 17:27:18.963994 has exceeded timeout, disconnecti$
1483618131: Socket error on client 2017-01-05 17:27:18.963994, disconnecting.
1483618810: Socket error on client mqttjs_bd875699, disconnecting.
1483618854: New connection from xx.xx.xx.xx on port 1883.
1483618854: New client connected from xx.xx.xx.xx as mqttjs_7aa97fd9 (c1, k10$
1483618865: Socket error on client mqttjs_7aa97fd9, disconnecting.
1483618866: New connection from xx.xx.xx.xx on port 1883.
1483618866: New client connected from xx.xx.xx.xx as mqttjs_25e2f297 (c1, k10$
1483618886: New connection from xx.xx.xx.xx on port 1883.
1483618886: New client connected from xx.xx.xx.xx as 2017-01-05 17:51:23.51980$
1483619018: Socket error on client mqttjs_25e2f297, disconnecting.
1483619019: New connection from xx.xx.xx.xx on port 1883.
1483619019: New client connected from xx.xx.xx.xx as mqttjs_1c8ec6dd (c1, k10$
1483619023: Socket error on client mqttjs_1c8ec6dd, disconnecting.
1483619024: New connection from xx.xx.xx.xx on port 1883.

person Himanshu mittal    schedule 05.01.2017    source источник
comment
Эти журналы показывают только одно отключение от кода Python, все остальные - от того, что похоже на процесс NodeJS.   -  person hardillb    schedule 05.01.2017


Ответы (2)


Вам необходимо запустить сетевой цикл клиента mqtt, чтобы он мог обрабатывать пакеты ping для проверки активности.

Вы запускаете цикл в фоновом режиме, добавляя следующую строку:

...
def connectHost(self,mqtt_host,mqtt_port,mqtt_username,mqtt_password):
    self.MQTT_USERNAME = mqtt_username
    self.MQTT_PASSWORD = mqtt_password
    self.MQTT_HOST = mqtt_host
    self.MQTT_PORT = mqtt_port

    try:
        self.client.username_pw_set(self.MQTT_USERNAME, self.MQTT_PASSWORD)
        print self.client.connect(self.MQTT_HOST,self.MQTT_PORT,60)
        self.client.loop_start() # This line
        print self.MQTT_HOST

    except Exception, e:
        print "Error connecting to %s:%d: %s" % (mqtt_host, mqtt_port, str(e))

    return True

Вы можете остановить цикл следующим образом:

def disconnectHost(self):
    self.client.disconnect()
    self.client.loop_stop() # This line
    return True
person hardillb    schedule 05.01.2017
comment
Извините, я забыл добавить код. Я пытаюсь использовать loop_start (), но у меня снова возникает та же проблема ... если вы мне поможете, что говорят журналы, почему он отключается - person Himanshu mittal; 05.01.2017
comment
Он отключается, потому что клиент не отправил пакет в течение периода поддержки активности. Как я уже сказал, это связано с тем, что сетевой цикл не работает. - person hardillb; 05.01.2017

Вы не показываете весь свой код. В частности, вы не показываете, где вы входите в loop_forever или какой-либо другой способ блокировки вашего кода. Когда я это добавил, я получаю вывод mosquitto -v (мой clientId - PAHO):

1483629832: New connection from 192.9.200.14 on port 1883.
1483629832: New client connected from 192.9.200.14 as PAHO (c0, k60).
1483629832: Sending CONNACK to PAHO (0, 0)
1483629895: Received PINGREQ from PAHO
1483629895: Sending PINGRESP to PAHO
person Gambit Support    schedule 05.01.2017
comment
Вам не нужно блокировать, client.start_loop() запустит отдельный поток для запуска цикла - person hardillb; 05.01.2017