Как избежать подписки на одну и ту же тему и многократного запуска обратного вызова в клиенте HiveMQ для Android?

Ожидаемое поведение

Я хочу иметь обратный вызов для прослушивания каждой темы, на которую я подписан, только один раз для каждого отправленного сообщения. Я имею в виду, что я хочу подписаться на тему 1000 раз, но когда сообщение получено, я хочу прослушать его только один раз.

IDK, если я что-то делаю не так (наверное).

Фактическое поведение

  • Я разрабатываю приложение для домашней камеры безопасности.
  • У меня есть список камер, которыми я владею.
  • Для каждой камеры в списке я подписываюсь на тему.
  • Каждые 30 с обновляю скрин, и снова подписываюсь на тему по каждой камере. Это означает, что на ТЕМУ можно подписаться много раз.
  • Каждый раз, когда я получаю сообщение по теме, обратный вызов запускает сообщения о том, сколько раз была подписана одна и та же тема.

Воспроизвести

Шаги

  1. есть тема camera/123
  2. подпишитесь на тему N раз с помощью приведенного ниже метода, который называется subscribeWith.
  3. Отправить сообщение через camera/123
  4. Вы получите сообщение N раз, потому что N раз вы подписались на тему

Код репродуктора

Только переменные

private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883

Создайте MQTT

private fun build() {
        if (mqtt != null) return

        mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { Timber.d("On Connected") }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()
    }

Подключение MQTT

fun connect(username: String, password: String) {
        build()

        this.username = username
        this.password = password

        mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username("abc")
                ?.password("123".toByteArray())
                ?.applySimpleAuth()
                ?.send()
    }

А затем подписываюсь на тему Каждый раз, когда я подписываюсь на тему, я использую эти забавные

fun subscribeWith(topic: String) {
        mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_MOST_ONCE)
               ?.callback { t -> onConsumingTopic(t) }  <- I THINK THIS IS THE IMPORTANT THING
                ?.send()
                ?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }

    }

person Deneb Chorny    schedule 18.12.2020    source источник
comment
В чем смысл многократной подписки на тему?   -  person Odysseus    schedule 18.12.2020
comment
Вам нужно будет вести список тем, на которые вы подписаны. Перед добавлением новой темы проверьте список.   -  person hardillb    schedule 18.12.2020
comment
именно так, как я сказал. Многократная подписка. изображая это. Если у вас есть тема A и MQTT, и вы делаете MQTT.subscribe(topic = A) MQTT.subscribe(topic = A) MQTT.subscribe(topic = A) тема срабатывает 3 раза при получении сообщения @Odysseus   -  person Deneb Chorny    schedule 18.12.2020
comment
@hardillb Я думал об этом, но думаю, что это просто обходной путь, а не реальное поведение. MQTT говорит, что подписка на одну и ту же тему означает замену, а не дублирование. Спасибо   -  person Deneb Chorny    schedule 18.12.2020
comment
@DenebChorny А в чем смысл многократной подписки на одну и ту же тему от одного клиента?   -  person Odysseus    schedule 18.12.2020
comment
Это не специально. По характеру приложения, когда я подписываюсь на тему, у меня нет возможности узнать, подписывался ли я на нее ранее (без сохранения кеша). Но все же MQTT указывает, что это не приводит к дублированию. @Одиссей   -  person Deneb Chorny    schedule 18.12.2020
comment
Ясно - тогда я согласен с @hardillb, что вы должны отслеживать темы, на которые уже подписаны, чтобы предотвратить повторную подписку на темы. Но на самом деле брокер должен перезаписывать уже существующие подписки, если вы повторно подписываетесь на них, или у вас случайно есть несколько экземпляров клиента?   -  person Odysseus    schedule 18.12.2020
comment
Я не думаю, что это происходит на брокере, он добавляет дополнительные соответствующие обратные вызовы в клиенте.   -  person hardillb    schedule 18.12.2020
comment
Спасибо. Вы правы @hardillb и Одиссей. Единственный способ получить обратный вызов — это подписаться на тему, и нет возможности заранее проверить темы, на которые вы подписались. Я ненавижу это решение, автобус здесь единственный   -  person Deneb Chorny    schedule 18.12.2020
comment
@hardillb Хороший совет - тогда он может и не использовать лямбду для обратного вызова.   -  person Odysseus    schedule 18.12.2020


Ответы (2)


Как упоминалось в комментариях, единственное решение на данный момент — хранить список тем, на которые подписаны, вне клиентской библиотеки MQTT, и проверять его, прежде чем подписываться на новые темы.

person hardillb    schedule 18.12.2020

Я нашел правильный ответ.

Нет необходимости регистрировать обратный вызов для каждого вызова подписки или использовать глобальный массив, обрабатывающий зарегистрированные темы, как это:

mqtt?.subscribeWith()
    ?.callback { t -> onConsumingTopic(t) }  <- This is not needed

Вместо этого вы можете зарегистрировать один глобальный обратный вызов для всех сообщений, например:

client.publishes(MqttGlobalPublishFilter.SUBSCRIBED, publish -> { ... } );

а затем вы можете подписаться без обратного вызова.

client.subscribeWith().topicFilter("test").qos(MqttQos.AT_LEAST_ONCE).send();

Полный пример:

Создание MQTT

 mqtt = Mqtt5Client.builder()
                .identifier(identifier())
                .serverHost(serverHost)
                .serverPort(serverPort)
                .automaticReconnect()
                .applyAutomaticReconnect()
                .addConnectedListener { onMQTTConnected(it) }
                .addDisconnectedListener { onMQTTDisconnected(it) }
                .buildAsync()

        mqtt?.publishes(MqttGlobalPublishFilter.SUBSCRIBED) { onConsumingTopic(it) }

Подключение MQTT

mqtt?.connectWith()
                ?.keepAlive(30)
                ?.sessionExpiryInterval(7200)
                ?.cleanStart(false)
                ?.simpleAuth()
                ?.username(context.getString(R.string.mqtt_user))
                ?.password(context.getString(R.string.mqtt_pw).toByteArray())
                ?.applySimpleAuth()
                ?.send()

Подписка на тему

mqtt?.subscribeWith()
                ?.topicFilter(topic)
                ?.qos(MqttQos.AT_LEAST_ONCE)
                ?.send()
                ?.whenComplete { ack, error -> onTopicSubscribed(ack, error, topic) }
person Deneb Chorny    schedule 10.02.2021