Как настроить шину сообщений в Liferay 7?

Я хочу использовать шину Liferay Message в DXP. Я написал следующий код.

DemoSender.java

package demo.sender.portlet;

import demo.sender.constants.DemoSenderPortletKeys;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.portlet.bridges.mvc.MVCPortlet;

import javax.portlet.ActionRequest;
import javax.portlet.ActionResponse;
import javax.portlet.Portlet;

import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

/**
 * @author parth.ghiya
 */
@Component(
    immediate = true,
    property = {
        "com.liferay.portlet.display-category=category.sample",
        "com.liferay.portlet.instanceable=true",
        "javax.portlet.display-name=demo-sender Portlet",
        "javax.portlet.init-param.template-path=/",
        "javax.portlet.init-param.view-template=/view.jsp",
        "javax.portlet.name=" + DemoSenderPortletKeys.DemoSender,
        "javax.portlet.resource-bundle=content.Language",
        "javax.portlet.security-role-ref=power-user,user"
    },
    service = Portlet.class
)
public class DemoSenderPortlet extends MVCPortlet {

    @Activate
    protected void activate(BundleContext bundleContext) {
        _bundleContext = bundleContext;

    }



    public void sendMessage(
            ActionRequest actionRequest, ActionResponse actionResponse) {
            if (_log.isInfoEnabled()) {
                _log.info("Sending message to DE Echo service");
            }
            Message message = new Message();
            message.setDestinationName("MyEchoDestination");
            message.setPayload("Hello World!");
            message.setResponseDestinationName("MyEchoResponse");

            _messageBus.sendMessage(message.getDestinationName(), message);

        }

        private static final Log _log = LogFactoryUtil.getLog(DemoSenderPortlet.class);

        private BundleContext _bundleContext;

        @Reference
        private MessageBus _messageBus;
}

DemoReceiver.java

package demo.receiver.portlet;

import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.BaseMessageListener;
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.messaging.MessageListener;

@Component(
    immediate = true, property = {"destination.name=MyEchoDestination"},
    service = MessageListener.class
)
public class DemoReceiverPortlet extends BaseMessageListener {

    @Override
    protected void doReceive(Message message) throws Exception {
        if (_log.isInfoEnabled()) {
            _log.info("Received: " + message);
        }

        String payload = (String)message.getPayload();

        if (_log.isInfoEnabled()) {
            _log.info("Message payload: " + payload);
        }
/*
        String responseDestinationName = message.getResponseDestinationName();

        if ((responseDestinationName != null) &&
            (responseDestinationName.length() > 0)) {

            Message responseMessage = new Message();

            responseMessage.setDestinationName(responseDestinationName);
            responseMessage.setResponseId(message.getResponseId());

            //This is just for demo purposes

            responseMessage.setPayload(payload);

            _messageBus.sendMessage(
                message.getResponseDestinationName(), responseMessage);
        }
  */
    }

    private static final Log _log = LogFactoryUtil.getLog(DemoReceiverPortlet.class);

    @Reference
    private volatile MessageBus _messageBus;
}

Проблема в том, что мой метод doReceive никогда не вызывается. Какую конфигурацию необходимо дополнительно добавить?

С Уважением

P.S: в DemoSender я отправляю сообщение по нажатию кнопки

Изменить № 1

Я добавил код конфигуратора следующим образом.

package demo.receiver.portlet;

import java.util.Dictionary;

import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;

import com.liferay.portal.kernel.concurrent.DiscardOldestPolicy;
import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
import com.liferay.portal.kernel.log.Log;
import com.liferay.portal.kernel.log.LogFactoryUtil;
import com.liferay.portal.kernel.messaging.Destination;
import com.liferay.portal.kernel.messaging.DestinationConfiguration;
import com.liferay.portal.kernel.messaging.DestinationFactory;
import com.liferay.portal.kernel.messaging.MessageBus;
import com.liferay.portal.kernel.util.HashMapDictionary;

@Component(
        enabled = false, immediate = true,
        service = DemoReceiverConfigurator.class
    )

public class DemoReceiverConfigurator {

    @Activate
    protected void activate(ComponentContext componentContext) {
        _bundleContext = componentContext.getBundleContext();
        System.out.println("===demo===");
        Dictionary<String, Object> properties =
            componentContext.getProperties();



        DestinationConfiguration destinationConfiguration =
            new DestinationConfiguration(DestinationConfiguration.DESTINATION_TYPE_PARALLEL,"MyEchoDestination");

        destinationConfiguration.setMaximumQueueSize(200);

        RejectedExecutionHandler rejectedExecutionHandler =
            new DiscardOldestPolicy() {

                @Override
                public void rejectedExecution(
                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {

                    if (_log.isWarnEnabled()) {
                        _log.warn(
                            "The current thread will handle the request " +
                                "because the audit router's task queue is at " +
                                    "its maximum capacity");
                    }

                    super.rejectedExecution(runnable, threadPoolExecutor);
                }

            };

        destinationConfiguration.setRejectedExecutionHandler(
            rejectedExecutionHandler);

        Destination destination = _destinationFactory.createDestination(
            destinationConfiguration);

        Dictionary<String, Object> destinationProperties =
            new HashMapDictionary<>();

        destinationProperties.put("destination.name", destination.getName());

        _destinationServiceRegistration = _bundleContext.registerService(
            Destination.class, destination, destinationProperties);
    }

    @Deactivate
    protected void deactivate() {
        if (_destinationServiceRegistration != null) {
            Destination destination = _bundleContext.getService(
                _destinationServiceRegistration.getReference());

            _destinationServiceRegistration.unregister();

            destination.destroy();
        }

        _bundleContext = null;
    }

    @Reference(unbind = "-")
    protected void setMessageBus(MessageBus messageBus) {
    }

    private static final Log _log = LogFactoryUtil.getLog(
        DemoReceiverConfigurator.class);

    private volatile BundleContext _bundleContext;

    @Reference
    private DestinationFactory _destinationFactory;

    private volatile ServiceRegistration<Destination>
        _destinationServiceRegistration;

}

Но мой метод Activate не вызывается, я включил false в своем классе прослушивателя сообщений и enable = false, немедленно = true в моем классе конфигуратора.

Не знаю, чего мне не хватает.


person Parth Ghiya    schedule 07.03.2018    source источник


Ответы (2)


Часто в OSGi достаточно этой, казалось бы, очевидной конфигурации. В данном случае, очевидно, это не так, потому что Liferay теперь знает о сообщении, которое вы отправляете и которое хотите получить, но Messagebus не знает о том, что этот пункт назначения должен быть создан.

Это кажется очевидным — если есть слушатель определенного сообщения, вероятно, должен быть пункт назначения. Но какой это будет тип? Параллельная обработка? Сколько параллельных обработчиков? Синхронный? В очереди? Это то, что вам нужно сделать.

Хотя быстрый поиск не нашел документации о том, как это сделать, вы можете использовать этот конфигуратор в качестве примера создания отсутствующей ссылки.

person Olaf Kock    schedule 07.03.2018
comment
я столкнулся с этим, просматривая исходный код liferay, я создал класс конфигуратора, как в рассматриваемом редактировании, но все равно не добился успеха. - person Parth Ghiya; 07.03.2018
comment
да, я взял ссылку из MonitoringMessagingConfigurator.java - person Parth Ghiya; 07.03.2018
comment
Вы пробовали удалить его? Или проверьте источник и посмотрите, активирован ли этот сервис вручную. - person Olaf Kock; 07.03.2018
comment
Все ли ваши пакеты запущены (проверьте с помощью Gogo Shell)? И проверьте, запущены ли службы (см. документацию Gogo Shell для команд отладки, для открытия Gogo Shell в работающем Liferay: telnet localhost 11311) - person Olaf Kock; 07.03.2018

Документация по MessageBus была улучшена несколько дней назад, взгляните на следующую страницу https://dev.liferay.com/develop/tutorials/-/knowledge_base/7-0/message-bus

person jorgediaz-lr    schedule 23.03.2018
comment
спасибо @jorgediaz-lr... я нашел похожие шаги в исходном коде LR, сделал то же самое и смог добиться - person Parth Ghiya; 26.03.2018