Siddhi - выборка из таблиц событий, которые не обновляются в течение определенного времени

В запросе Siddhi я импортирую два потока S1 и S2. Если я получу в потоке S1, я вставлю в таблицу событий T1, а когда я получу в S2, я обновлю в таблице T1 на основе идентификатора, а также отправлю обновленные значения из таблицы в выходной поток O1.

Как часть требования, мне нужно получить содержимое таблицы T1, которое вставлено до 5 минут (т. е. если запись находится более 5 минут) и отправить в другой выходной поток O2.

@name('S1')
from S1
select id, srcId, 'null' as msgId, 'INP' as status
insert into StatusTable;

@name('S2')
from S2#window.time(1min) as g join StatusTable[t.status == 'INP'] as t
on ( g.srcId == t.id)
select t.id as id, g.msgId as msgId, 'CMP' as status
update StatusTable on TradeStatusTable.id == id;

@name('Publish')
from S2 as g join StatusTable[t.status == 'CMP'] as t on ( g.srcId == t.id and t.status == 'CMP')
select t.id as id, t.msgId as msgId, t.status as status
insert into O1;

Как добавить запрос в этот существующий запрос для извлечения записей из таблицы TradeStatus, которая проходит более 5 минут. Поскольку таблицу нельзя использовать отдельно, мне нужно объединить ее с потоком, как реализовать этот сценарий?


person gpk    schedule 22.09.2017    source источник
comment
Вы имеете в виду, что вам нужны записи, которые не обновляются более 5 минут из statusTable?   -  person dnWick    schedule 22.09.2017
comment
Да, мне нужны те, которые не обновляются более 5 мин, нужно отправлять в другом потоке.   -  person gpk    schedule 22.09.2017


Ответы (1)


String WebAttackSuccess = "" +
           "@info(name = 'found_host_charged1') "+
           "from ATDEventStream[ rid == 10190001 ]#window.timeBatch(10 sec) as a1 "+
           "join ATDEventStream[ rid == 10180004 ]#window.time(10 sec) as a2 on a2.src_ip == a1.src_ip and a2.dst_ip == a1.dst_ip " +
           " select UUID() as uuid,1007 as cid,a1.sensor_id as sensor_id,a1.interface_id as interface_id,a1.other_id as other_id,count(a1.uuid) as event_num,min(a1.timestamp)  as first_seen,max(a2.timestamp) as last_seen,'' as IOC,a1.dst_ip as victim,a1.src_ip as attacker,a1.uuid as NDE4,sample:sample(a2.uuid) as Sample_NDE4 " +
           " insert into found_host_charged1;"+
           ""+
           "@info(name = 'found_host_charged2') "+
           "from every a1 = found_host_charged1 " +
           "-> a2 = ATDEventStream[dns_answers != ''] "+
           "within 5 min "+
           "select UUID() as uuid,1008 as cid,a2.sensor_id as sensor_id,a2.interface_id as interface_id,a2.other_id as other_id,count(a2.uuid) as event_num,a1.first_seen  as first_seen,max(a2.timestamp) as last_seen,a2.dns_answers as IOC,a2.dst_ip as victim,a2.src_ip as attacker,a1.uuid as NDE5,sample:sample(a2.uuid) as Sample_NDE5 " +
           "insert into found_host_charged2; ";

Это часть моей работы, я использую два потока, возможно, вы сможете получить данные из StatusTable во втором потоке. Если это еще не решено, вы можете изменить StatusTable на S1.

person cochar    schedule 25.09.2017