Таблицы событий Siddhi не работают должным образом с Apache Storm

Я попытался использовать таблицы событий в своем плане выполнения и попытаться присоединиться к таблице с входным потоком в WSO2 CEP v4.1.0 в распределенном режиме с Apache Storm.

Вот мой запрос о Сиддхи.

@Plan:name('ExecutionPlan')

@Import('InputStream:1.0.0')
define stream InputStream (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@Export('outputStream:1.0.0')
define stream OutputStream (id string, param3 string);

@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep') 
define table cepTable (id string, param1 int, param2 double, param3 string, param4 string, param5 string, param6 string, param7 string);

@name('query1') 
@dist(parallel='2', execGroup='Filtering')
partition with ( id of InputStream )
begin
  from InputStream join cepTable 
  on cepTable.id == InputStream.id 
  select InputStream.id as id, InputStream.param3 as param3  
  insert into OutputStream;
end;

Но он предоставляет следующие исключения

TID: [-1234] [] [2016-05-13 14:19:11,847] ERROR {org.wso2.carbon.event.processor.admin.EventProcessorAdminService} -  Error while initialising the connection, null 
org.wso2.carbon.event.processor.core.exception.ExecutionPlanConfigurationException: Error while initialising the connection, null
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:154)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.executeManualDeployment(EventProcessorDeployer.java:178)
    at org.wso2.carbon.event.processor.core.internal.util.EventProcessorConfigurationFilesystemInvoker.save(EventProcessorConfigurationFilesystemInvoker.java:95)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.editInactiveExecutionPlan(CarbonEventProcessorService.java:181)
    at org.wso2.carbon.event.processor.admin.EventProcessorAdminService.editInactiveExecutionPlan(EventProcessorAdminService.java:108)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.axis2.rpc.receivers.RPCUtil.invokeServiceClass(RPCUtil.java:212)
    at org.apache.axis2.rpc.receivers.RPCMessageReceiver.invokeBusinessLogic(RPCMessageReceiver.java:117)
    at org.apache.axis2.receivers.AbstractInOutMessageReceiver.invokeBusinessLogic(AbstractInOutMessageReceiver.java:40)
    at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:169)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:82)
    at org.wso2.carbon.core.transports.local.CarbonLocalTransportSender.finalizeSendWithToAddress(CarbonLocalTransportSender.java:45)
    at org.apache.axis2.transport.local.LocalTransportSender.invoke(LocalTransportSender.java:77)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:430)
    at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:225)
    at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
    at org.wso2.carbon.event.processor.stub.EventProcessorAdminServiceStub.editInactiveExecutionPlan(EventProcessorAdminServiceStub.java:2473)
    at org.apache.jsp.eventprocessor.edit_005fexecution_005fplan_005fajaxprocessor_jsp._jspService(edit_005fexecution_005fplan_005fajaxprocessor_jsp.java:84)
    at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:432)
    at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
    at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
    at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
    at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:727)
    at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CSRFPreventionFilter.doFilter(CSRFPreventionFilter.java:88)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CRLFPreventionFilter.doFilter(CRLFPreventionFilter.java:59)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:504)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:170)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.continueInvocation(CompositeValve.java:99)
    at org.wso2.carbon.tomcat.ext.valves.CarbonTomcatValve$1.invoke(CarbonTomcatValve.java:47)
    at org.wso2.carbon.webapp.mgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:57)
    at org.wso2.carbon.event.receiver.core.internal.tenantmgt.TenantLazyLoaderValve.invoke(TenantLazyLoaderValve.java:48)
    at org.wso2.carbon.tomcat.ext.valves.TomcatValveContainer.invokeValves(TomcatValveContainer.java:47)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:62)
    at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:159)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:950)
    at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:421)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1074)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:611)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1739)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1698)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException: Error while initialising the connection, null
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:79)
    at org.wso2.siddhi.extension.eventtable.RDBMSEventTable.init(RDBMSEventTable.java:119)
    at org.wso2.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventTable(DefinitionParserHelper.java:99)
    at org.wso2.siddhi.core.util.ExecutionPlanRuntimeBuilder.defineTable(ExecutionPlanRuntimeBuilder.java:74)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.defineTableDefinitions(ExecutionPlanParser.java:194)
    at org.wso2.siddhi.core.util.parser.ExecutionPlanParser.parse(ExecutionPlanParser.java:140)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:53)
    at org.wso2.siddhi.core.SiddhiManager.createExecutionPlanRuntime(SiddhiManager.java:61)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.init(SiddhiBolt.java:104)
    at org.wso2.carbon.event.processor.common.storm.component.SiddhiBolt.<init>(SiddhiBolt.java:86)
    at org.wso2.carbon.event.processor.core.internal.storm.util.StormTopologyConstructor.constructTopologyBuilder(StormTopologyConstructor.java:114)
    at org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager.submitTopology(StormTopologyManager.java:127)
    at org.wso2.carbon.event.processor.core.internal.CarbonEventProcessorService.addExecutionPlan(CarbonEventProcessorService.java:314)
    at org.wso2.carbon.event.processor.core.EventProcessorDeployer.processDeploy(EventProcessorDeployer.java:124)
    ... 76 more
Caused by: java.sql.SQLException
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:254)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connect(PooledConnection.java:182)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.createConnection(ConnectionPool.java:701)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowConnection(ConnectionPool.java:635)
    at org.apache.tomcat.jdbc.pool.ConnectionPool.getConnection(ConnectionPool.java:188)
    at org.apache.tomcat.jdbc.pool.DataSourceProxy.getConnection(DataSourceProxy.java:127)
    at org.wso2.siddhi.extension.eventtable.rdbms.DBHandler.<init>(DBHandler.java:73)
    ... 89 more
Caused by: java.lang.NullPointerException
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:278)
    at org.apache.tomcat.jdbc.pool.PooledConnection.connectUsingDriver(PooledConnection.java:246)
    ... 95 more

В чем причина этого?


person Saveendra Ekanayake    schedule 13.05.2016    source источник


Ответы (2)


Когда план выполнения развернут на сервере WSO2CEP, будет достаточно следующих аннотаций:

@From(eventtable = 'rdbms' , datasource.name = 'MYSQL' , table.name = 'cep')

учитывая, что мы определили источник данных с именем 'MYSQL' среди конфигураций источника данных на сервере WSO2CEP.

Однако при развертывании плана выполнения в Storm конфигурация источника данных должна быть указана встроенной (поскольку конфигурации источника данных, определенные на сервере WSO2CEP, недоступны в Storm). Это можно сделать, как показано ниже:

@From(eventtable = 'rdbms' , jdbc.url='', username='', password='', driver.name='' , table.name = 'cep')

Заполните элементы аннотации надлежащим образом.

Надеюсь, это решит вашу проблему.

person Dilini    schedule 17.05.2016

1) Пожалуйста, проверьте, настроили ли вы соединение с MySQL. Для настройки соединения с MySQL проверьте ссылку ниже: https://docs.wso2.com/display/CEP410/Setting+up+MySQL

Проверить доступность сервера, наличие схемы на сервере с нужной таблицей (cep). Правильно ли указано имя схемы "MYSQL". Пожалуйста, проверьте.

2) Проверьте конфигурацию штормовой кластеризации в файле event-processor.xml. Для получения дополнительной информации о конфигурации проверьте ссылку ниже: //docs.wso2.com/display/CLUSTER44x/Clustering+CEP+4.1.0#ClusteringCEP4.1.0-DistributedmodedeploymentDistributedCEP

person Renukaradhya    schedule 14.05.2016
comment
Вы предоставили ссылки для настройки источников данных MySQL для управления реестром или данными управления пользователями. Это не имеет значения. Мой источник данных работает нормально. Apache Storm также работает нормально. Простые сиддхи запросили, чтобы wso2 работал нормально. но соединение с таблицей rdbms не работает со storm. Это мой вопрос? - person Saveendra Ekanayake; 16.05.2016