Ожидаемое поведение
Я хочу иметь обратный вызов для прослушивания каждой темы, на которую я подписан, только один раз для каждого отправленного сообщения. Я имею в виду, что я хочу подписаться на тему 1000 раз, но когда сообщение получено, я хочу прослушать его только один раз.
IDK, если я что-то делаю не так (наверное).
Фактическое поведение
- Я разрабатываю приложение для домашней камеры безопасности.
- У меня есть список камер, которыми я владею.
- Для каждой камеры в списке я подписываюсь на тему.
- Каждые 30 с обновляю скрин, и снова подписываюсь на тему по каждой камере. Это означает, что на ТЕМУ можно подписаться много раз.
- Каждый раз, когда я получаю сообщение по теме, обратный вызов запускает сообщения о том, сколько раз была подписана одна и та же тема.
Воспроизвести
Шаги
- есть тема camera/123
- подпишитесь на тему N раз с помощью приведенного ниже метода, который называется subscribeWith.
- Отправить сообщение через camera/123
- Вы получите сообщение N раз, потому что N раз вы подписались на тему
Код репродуктора
Только переменные
private var mqtt: Mqtt5AsyncClient? = null
private var username: String? = null
private var password: String? = null
private val serverHost: String,
private val serverPort: Int = 1883
Создайте MQTT
private fun build() {
if (mqtt != null) return
mqtt = Mqtt5Client.builder()
.identifier(identifier())
.serverHost(serverHost)
.serverPort(serverPort)
.automaticReconnect()
.applyAutomaticReconnect()
.addConnectedListener { Timber.d("On Connected") }
.addDisconnectedListener { onMQTTDisconnected(it) }
.buildAsync()
}
Подключение MQTT
fun connect(username: String, password: String) {
build()
this.username = username
this.password = password
mqtt?.connectWith()
?.keepAlive(30)
?.sessionExpiryInterval(7200)
?.cleanStart(false)
?.simpleAuth()
?.username("abc")
?.password("123".toByteArray())
?.applySimpleAuth()
?.send()
}
А затем подписываюсь на тему Каждый раз, когда я подписываюсь на тему, я использую эти забавные
fun subscribeWith(topic: String) {
mqtt?.subscribeWith()
?.topicFilter(topic)
?.qos(MqttQos.AT_MOST_ONCE)
?.callback { t -> onConsumingTopic(t) } <- I THINK THIS IS THE IMPORTANT THING
?.send()
?.whenComplete { ack, error -> onTopicConnected(ack, error, topic) }
}