Сообщения HornetQ все еще остаются в очереди после использования с использованием основного API

Я новичок в HornetQ, поэтому, пожалуйста, потерпите меня. Позвольте мне сначала сказать вам мои требования:

Мне нужно промежуточное программное обеспечение для очереди сообщений, которое может передавать сообщения размером около 1 КБ между различными процессами с малой задержкой и устойчивостью (т. Е. Оно должно выдерживать сбои системы). У меня было бы несколько процессов, записывающих в одни и те же очереди, и аналогичное чтение нескольких процессов из одной и той же очереди.

Для этого я выбрал HornetQ, так как он имеет лучший рейтинг за постоянную передачу сообщений.

В настоящее время я использую Hornetq v2.2.2Final в качестве автономного сервера.
Я могу успешно создавать постоянные/временные очереди используя core api (ClientSession), и успешно отправлять сообщения в очередь (ClientProducer).
Точно так же я могу читать сообщения из очередь с помощью основного API (ClientConsumer).

Проблема возникает после этого, когда клиент прочитал сообщение, сообщение все еще остается в очереди, т.е. количество сообщений в очереди остается постоянным. Возможно, я ошибаюсь, но у меня сложилось впечатление, что как только сообщение используется (read + ack), оно удаляется из очереди. Но этого не происходит. в моем случае одни и те же сообщения читаются снова и снова.

Кроме того, я хотел бы сказать, что пытался использовать неустойчивые очереди с неустойчивыми сообщениями. но проблема остается.

Код производителя, который я использую:

public class HQProducer implements Runnable {

    private ClientProducer producer;
    private boolean killme;
    private ClientSession session;
    private boolean durableMsg;

    public HQProducer(String host, int port, String address, String queueName,
            boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
        this.durableMsg = durableMsg;
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            if (queueExists(queueName)) {
                if (deleteQ) {
                    System.out.println("Deleting existing queue :: " + queueName);
                    session.deleteQueue(queueName);
                    System.out.println("Creating queue :: " + queueName);
                    session.createQueue(address, queueName, true);
                }
            } else {
                System.out.println("Creating new  queue :: " + queueName);
                session.createQueue(address, queueName, durable);
            }
            producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

            killme = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killme) {
            try {
                ClientMessage message = session.createMessage(durableMsg);

                message.getBodyBuffer().writeString("Hello world");

                producer.send(message);
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("Producer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killme) {
        this.killme = killme;
    }

    private boolean queueExists(String qname) {
        boolean res = false;
        try {
            //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
            QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
            if (queueQuery.isExists()) {
                res = true;
            }
        } catch (HornetQException ex) {
            res = false;
        }
        return res;
    }
}

Также код для потребителя:

public class HQConsumer implements Runnable {

    private ClientSession session;
    private ClientConsumer consumer;
    private boolean killMe;

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            session.start();

            consumer = session.createConsumer(queueName, "",0,-1,browseOnly);

            killMe = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killMe) {
            try {
                ClientMessage msgReceived = consumer.receive();
                msgReceived.acknowledge();
                //System.out.println("message = " + msgReceived.getBodyBuffer().readString());
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("ConSumer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killMe) {
        this.killMe = killMe;
    }
}

Конфигурация сервера HornetQ ::

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

   <paging-directory>${data.dir:../data}/paging</paging-directory>

   <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

   <journal-directory>${data.dir:../data}/journal</journal-directory>

   <journal-min-files>10</journal-min-files>

   <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

   <connectors>
      <connector name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </connector>

      <connector name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
      </connector>
   </connectors>

   <acceptors>
      <acceptor name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </acceptor>

      <acceptor name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
         <param key="direct-deliver" value="false"/>
      </acceptor>
   </acceptors>

   <security-settings>
      <security-setting match="#">
         <permission type="createNonDurableQueue" roles="guest"/>
         <permission type="deleteNonDurableQueue" roles="guest"/>
         <permission type="createDurableQueue" roles="guest"/>
         <permission type="deleteDurableQueue" roles="guest"/>
         <permission type="consume" roles="guest"/>
         <permission type="send" roles="guest"/>
      </security-setting>
   </security-settings>

   <address-settings>
      <!--default for catch all-->
      <address-setting match="#">
         <dead-letter-address>jms.queue.DLQ</dead-letter-address>
         <expiry-address>jms.queue.ExpiryQueue</expiry-address>
         <redelivery-delay>0</redelivery-delay>
         <max-size-bytes>10485760</max-size-bytes>       
         <message-counter-history-day-limit>10</message-counter-history-day-limit>
         <address-full-policy>BLOCK</address-full-policy>
      </address-setting>
   </address-settings>

</configuration>

person Vivek Mehra    schedule 23.06.2011    source источник
comment
a/c на это вам нужно подтвердить сообщение после обработки, вы делаете то же самое?   -  person Asad Rasheed    schedule 23.06.2011


Ответы (2)


С API-интерфейсом ядра hornetq вы должны явно подтвердить сообщение. Я не вижу, где это происходит в вашем тесте.

Если вы не отвечаете, это причина, по которой ваши сообщения блокируются. Мне нужно увидеть ваш полный пример, чтобы дать вам полный ответ.

Также: вы должны определить свой createSession с помощью: createSession (true, true, 0)

Основной API имеет возможность группировать ACK. Вы не используете транзакционный сеанс, поэтому вы не будете отправлять подтверждения на сервер, пока не достигнете ackBatchSize, настроенного в вашем serverLocator. При этом любое подтверждение будет отправлено на сервер, как только вы вызовете accept() в своем сообщении.

Вариант, который вы сейчас используете, эквивалентен JMS DUPS_OK ​​с определенным DUPS_SIZE.

(Пост отредактировал мой первоначальный ответ после некоторой итерации с вами)

person Clebert Suconic    schedule 25.06.2011
comment
ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); .. Я подтверждаю код - person Vivek Mehra; 26.06.2011
comment
Основной API имеет возможность группировать ACK. Вы не используете транзакционный сеанс, поэтому вы не будете отправлять подтверждения на сервер, пока не достигнете ackBatchSize, настроенного в вашем serverLocator. Вы должны определить свой createSession с помощью: createSession(true, true, 0); При этом любое подтверждение будет отправлено на сервер, как только вы вызовете accept() в своем сообщении. - person Clebert Suconic; 28.06.2011
comment
Вы не вернулись в эту тему. Я так понимаю, вы решили свою проблему? - person Clebert Suconic; 29.06.2011
comment
Была такая же проблема, и это, кажется, исправило ее. Очевидно, я не понял java-документ для createSession (логическое значение autoCommitSends, логическое значение autoCommitAcks, int ackBatchSize). autoCommitSends и autoCommitAck не имеют ничего общего с частью подтверждения, только с фиксацией, верно? - person Alper Akture; 29.01.2014
comment
@AlperAkture Не уверен, что на ваш вопрос когда-либо был дан ответ, но, учитывая, что в названии есть Acks, по крайней мере один из них связан с подтверждениями. - person dimwittedanimal; 13.12.2017
comment
@dimwittedanimal не уверен, я не могу вспомнить, что было раньше :-/ - person Alper Akture; 14.12.2017

Установка ackbatchsize помогла мне решить проблему.. Спасибо за помощь

person Vivek Mehra    schedule 01.07.2011