Команда содержала недопустимый идентификатор сообщения: 2 после получения первого сообщения в Android-клиенте

Я реализую что-то вроде функций чата на Android с помощью MQTT, взял доступный пример из MQTT и немного изменил его, чтобы иметь Listener. Устанавливается соединение от Android-клиента к ActiveMQ-Brocker. Также у меня есть JbossServer с @ResourceAdapter ("activemq.rar") и MDB для реализации администрирования (создание новой темы во время выполнения и т. Д.) На стороне сервера.

У меня вопрос относительно Слушателя и Издателя в Android-Clien. Слушатель и подписка настроены. Он работает хорошо, но только для первого сообщения (в обоих направлениях: отправка из клиента в тему и отправка из веб-консоли в тему). При получении второго сообщения возникает ошибка на Android-клиенте в Listener onFailure: java.net.ProtocolException: Команда с сервера содержала недопустимый идентификатор сообщения: 2. Та же ошибка возникает, когда я пытаюсь отправить второе сообщение от клиента. Я прикрепил свой код. Буду признателен, если вы поможете мне с этой ошибкой.

private void callBackConnect() {
    mqtt = new MQTT();
    mqtt.setClientId("android-mqtt-example");

    try {
        // mqtt.setHost("localhost", 1883);
        // mqtt.setHost("tcp://10.0.0.62", 5445);
        mqtt.setHost(sAddress);
        Log.d(TAG, "Address set: " + sAddress);
    } catch (URISyntaxException urise) {
        Log.e(TAG, "URISyntaxException connecting to " + sAddress + " - "
                + urise);
    }

    if (sUserName != null && !sUserName.equals("")) {
        mqtt.setUserName(sUserName);
        Log.d(TAG, "UserName set: [" + sUserName + "]");
    }

    if (sPassword != null && !sPassword.equals("")) {
        mqtt.setPassword(sPassword);
        Log.d(TAG, "Password set: [" + sPassword + "]");
    }

    // futureConnection = mqtt.futureConnection();
    progressDialogListener = ProgressDialog.show(this, "",
            "setListener...", true);
    progressDialogListener.setCanceledOnTouchOutside(true);

    callBackConnection = mqtt.callbackConnection();

    callBackConnection.listener(new Listener() {
        public void onConnected() {
            Log.i(TAG, "callBackconnect()->setListener:onConnected");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Connected (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onDisconnected() {
            Log.i(TAG, "callBackconnect()->setListener:onDisconnected");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Disconnected (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onFailure(Throwable arg0) {
            Log.i(TAG,
                    "callBackconnect()->setListener:onFailure:"
                            + arg0.toString());
            Log.e(TAG, arg0.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Failure (listener)")
            // .setNeutralButton("OK", null).show();
            progressDialogListener.dismiss();
        }

        public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
            Log.i(TAG,
                    "callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2)");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Publish (listener)")
            // .setNeutralButton("OK", null).show();

            final String msgBody = msg.utf8().toString();
            final String topicBody = topic.utf8().toString();
            // Create runnable for posting
            mHandler.post(new Runnable() {
                public void run() {
                    updateReceiveETInUi(topicBody, msgBody);
                }
            });

            Log.i(TAG,
                    "callBackconnect()->setListener:onPublish(UTF8Buffer arg0, Buffer arg1, Runnable arg2):"
                            + topicBody + " : " + msgBody);

            if (msgBody.startsWith("REPLY: ")) {
                // Don't reply to your own reply
                Log.i(TAG,
                        "callBackconnect()->setListener:onPublish-> msgBody.startsWith REPLY");
            } else {
                try {
                    byte[] reply = "REPLY: Hello Back".getBytes();
                    callBackConnection.publish(sDestination, reply,
                            QoS.AT_MOST_ONCE, true, null);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    });

    // callBackConnection.resume();
    progressDialogConn = ProgressDialog.show(this, "", "Connecting...",
            true, true);
    callBackConnection.connect(new Callback<Void>() {
        public void onFailure(Throwable value) {
            Log.i(TAG,
                    "callBackconnect()->connect->.onFailure : "
                            + value.toString());
            Log.e(TAG,
                    "callBackconnect()->connect->.onFailure : "
                            + value.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage(value.toString())
            // .setNeutralButton("OK", null).show();
            progressDialogConn.dismiss();
        }
        public void onSuccess(Void v) {
            Log.i(TAG, "callBackconnect()->connect->onSuccess(Void v): "
                    + v.TYPE.toString());
            // new
            // AlertDialog.Builder(MQTTActivity.this).setMessage("Connected (callback)")
            // .setNeutralButton("OK", null).show();
            progressDialogConn.dismiss();
        }
        public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {
            // You can now process a received message from a topic.
            // Once process execute the ack runnable.
            Log.i(TAG,
                    topic.toString()
                            + "callBackconnect()->connect->.onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack)");
            ack.run();
        }

    });


    if (sDestination.equals("")) {
        Log.i(TAG, "Destination must be provided");
    } else {
        callBackubSubcribe();

    }
}

private void callBackubSubcribe() {

    topics[0] = new Topic(sDestination, QoS.AT_LEAST_ONCE);
    // Topic[] topics = { new Topic(sDestination, QoS.AT_LEAST_ONCE) };
    Log.i(TAG, "topics[0]");

    for (int i = 0; i < topics.length; i++) {
        Log.i(TAG, "callBackubSubcribe topics: " + topics[i].toString());
    }

    progressDialogSub = ProgressDialog.show(this, "", "Subscribing...",
            true);
    progressDialogSub.setCanceledOnTouchOutside(true);

    Log.i(TAG, "callBackubSubcribe()-> TRY to subscribe(topics)");
    callBackConnection.subscribe(topics, new Callback<byte[]>() {
        public void onFailure(Throwable value) {
            Log.i(TAG,
                    "callBackubSubcribe->subscribe:onFailure:"
                            + value.toString());
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage(value.toString())
            // .setNeutralButton("OK", null).show();
            progressDialogSub.dismiss();
        }

        public void onSuccess(Void v) {
            Log.i(TAG, "callBackubSubcribe()->connect->.onSuccess");
            // new AlertDialog.Builder(MQTTActivity.this)
            // .setMessage("Connected (callback)")
            // .setNeutralButton("OK", null).show();
            progressDialogSub.dismiss();
        }

        @Override
        public void onSuccess(byte[] arg0) {
            Log.i(TAG,
                    "callBackubSubcribe()->connect->.onSuccess(byte[] arg0): "
                            + arg0.getClass().toString());
            progressDialogSub.dismiss();
        }
    });
}

Ура, Алекс


person Alexito    schedule 28.05.2014    source источник


Ответы (1)


Некоторое время я работал над сервисом mqtt, но сравнивая ваш код с mine (и при условии, что вы используете клиентская библиотека mqtt от fusesource), я заметил несколько странных вещей (возможно, из-за того, что я потерялся с идентификацией).

1) В вашем обработчике подключения есть onSuccess, который должен получать сообщения из тем.

   public void onSuccess(UTF8Buffer topic, Buffer payload, Runnable ack) {

Я думаю, что у вас должен быть обычный void onSuccess, который вы реализовали выше.

2) то я вижу, что ваш public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) в Listener не вызывает ack ack.run();, на самом деле вы вызываете этот ack.run () в onSuccess, упомянутом выше, и я подозреваю, что это может быть источником проблемы.

Кстати, я думаю, что может быть лучше запустить mqtt как службу, обмениваться сообщениями взад и вперед с действием и выполнять действия, связанные с графическим интерфейсом, в коде действия.

person Thomas    schedule 28.05.2014