ActiveMQ Обнаружение новой потребительской темы

У меня есть стандартный брокер ActiveMQ

    private static String localVMurl = "vm://localhost";

    broker = new BrokerService();
    broker.addConnector(localVMurl); 
    broker.start();

и все хорошо. моя цель состоит в том, чтобы потребитель связался с брокером по определенной теме. Как только это соединение будет обнаружено, брокер либо передаст сообщения, если источник активно работает с темой, либо брокер запустит новый источник для этой конкретной темы. Однако для этого мне нужно, чтобы кто-то обнаружил, когда новый потребитель подключается и запрашивает определенную тему.

мой основной потребительский код:

    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(remoterURL);
    Connection connection = connectionFactory.createConnection();
    connection.setClientID("clinet1");
    connection.start();

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("some_topic");
    MessageConsumer consumer = session.createConsumer(topic);
    consumer.setMessageListener(new MyMessageListener());

и я вижу в журналах брокера:

  <161005 11:03:41> [.0.1:64433@5001] DEBUG tRegion - localhost adding destination: topic://ActiveMQ.Advisory.Consumer.Topic.some_topic

Итак, я знаю, что потребитель подключается и подписывается на эту тему, мне просто нужно как-то поймать это событие.

Любые мысли о том, как это сделать?


person user1772250    schedule 05.10.2016    source источник


Ответы (1)


Консультативное сообщение - это то, что вам нужно. каждый раз, когда вы получаете сообщение с этим кодом, это означает, что у вас запускается или останавливается новый потребитель.

документ http://activemq.apache.org/advisory-message.html

пример:

    //org.apache.activemq.advisory.AdvisorySupport.getDestinationAdvisoryTopic(Destination)
Destination advisoryDestination = AdvisorySupport.getConsumerAdvisoryTopic(topic )
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    if (msg instanceof ActiveMQMessage){
        try {
            ActiveMQMessage aMsg =  (ActiveMQMessage)msg;
            ConsumerInfo consumer = (ConsumerInfo) aMsg.getDataStructure();
        } catch (JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}
person Hassen Bennour    schedule 05.10.2016
comment
мне нужно слушать брокера, а не потребителя. брокер заранее не знает, на какую тему будет подписываться потребитель. это не похоже на то, что я хочу? - person user1772250; 05.10.2016
comment
можете ли вы объяснить, пожалуйста, что я понял, Итак, я знаю, что потребитель подключается и подписывается на эту тему, мне просто нужно как-то поймать это событие. и мой код подключается к брокеру и слушает его, что вы подразумеваете под мне нужно слушать брокера, а не потребителя. - person Hassen Bennour; 05.10.2016
comment
если вам нужно это событие, не зная заранее места назначения, вы можете заменить Destination advisoryDestination = AdvisorySupport.getConsumerAdvisoryTopic(topic ); на Topic advisoryDestination = session.createTopic("ActiveMQ.Advisory.Consumer.Topic.*");, чтобы получать уведомления о любой подписке на любую тему в брокере, а с помощью ConsumerInfo вы можете получить другую информацию, например место назначения потребителя consumer.getDestination(); - person Hassen Bennour; 05.10.2016
comment
у меня есть брокер в коде, без продюсеров и тем. как только потребитель подключается к брокеру, желающему подписаться на определенную тему, я хочу, чтобы брокер породил производителя, связанного с этой темой. для этого мне нужно определить, когда потребитель подключается к брокеру, как только это произойдет, брокер может создать соответствующего производителя. Имеет ли это смысл ? - person user1772250; 05.10.2016
comment
извините, но я не понимаю определения спавна и спауна соответствующего производителя :) вы можете использовать другой термин - person Hassen Bennour; 05.10.2016
comment
неважно я разобрался. в самом брокере мне нужен потребитель сообщений AdvisoryTopic, оттуда я могу с этим справиться. Спасибо! - person user1772250; 05.10.2016
comment
Да, это мое последнее предложение - person Hassen Bennour; 05.10.2016
comment
согласен, было не понятно куда ставить этот потребитель, но теперь он у меня есть и все работает спасибо за помощь! - person user1772250; 05.10.2016
comment
Кто-нибудь знает, как узнать, указывает ли консультативное сообщение, что потребитель запущен или остановлен? Спасибо - person beta-brad; 18.04.2019
comment
@beta-brad конечно, здесь activemq.apache.org/advisory-message.html - person Hassen Bennour; 23.04.2019