Есть ли способ обработки нескольких временных ограничений в Flink CEP?

Как указано в документе CEP (https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/libs/cep.html), что в последовательности шаблонов допускается только одно временное ограничение, я изо всех сил пытаюсь найти способ обработки бизнес-кейса, который содержит 2 временных ограничения.

Мне нужно отслеживать некоторые бизнес-события и предупреждать о событиях, соответствующих следующим правилам:

  1. новая учетная запись зарегистрирована
  2. аккаунт аутентифицируется через 5 минут после регистрации
  3. счет совершает не менее 2 транзакций, сумма транзакции которых превышает 1000,00 в течение следующего 1 часа.

А код выглядит примерно так:

Pattern<Event, ?> pattern = Pattern.<Event>begin("register").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.REGISTER);
    }
}).followedBy("authentication").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.AUTHENTICATION);
    }
}).where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        for (Event event : ctx.getEventsForPattern("register")) {
            if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 5) {
                return true;
            }
        }
        return false;
    }
}).followedBy("transaction").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter<Event value> throws Exception {
        return (value.getEventType() == EventType.TRANSACTION && value.getAmount() > 1000.00);
    }
}).where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event value, Context<Event> ctx) throws Exception {
        for (Event event : ctx.getEventsForPattern("authentication")) {
            if (value.getEventTime() - event.getEventTime() <= 1000 * 60 * 60) {
                return true;
            }
        }
        return false;
    }
}).timesOrMore(2);

Вы можете видеть, что я использую 2 IterativeConditions для обработки временных ограничений. Есть ли лучший способ сделать код более лаконичным?


person Hayes Yang    schedule 07.08.2018    source источник


Ответы (1)


Как вы сказали, вы можете применить только одно временное ограничение ко всему шаблону прямо сейчас в библиотеке CEP. Что вы можете сделать, так это разделить узор на 2 подшаблона. Сначала примените шаблон, который будет искать REGISTER -> AUTHENTICATE и генерировать из них сложное событие (назовем его REGISTER_AUTHENTICATED). И затем используйте его в следующем шаблоне REGISTER_AUTHENTICATED -> 2 * TRANSACTIONS.

Затем вы можете применить два временных ограничения к обоим из этих шаблонов.

person Dawid Wysakowicz    schedule 09.08.2018
comment
Привет, @Dawid, спасибо за ответ. Но будут ли события TRANSACTION отфильтровываться первым шаблоном, чтобы правило, определенное во втором шаблоне, никогда не срабатывало? Например, входящие события имеют следующие данные: {регистрация, 00:00:00, логин, 00:02:00, авторизация, 00:04:00, транс, 00:10:00, транс, 00:20:00 , транс, 00:25:00 ...}. После первого шаблона поток событий будет {register, 00:00:00, auth, 00:04:00}, поэтому второй шаблон никогда не вступит в силу. Или у меня недоразумение? - person Hayes Yang; 09.08.2018
comment
Вам нужно будет объединить его с исходным потоком. Например. после разделения исходного потока на регистрацию и аутентификацию + отдых. Я знаю, что это не идеально, но вы правильно справляетесь с ограничениями по времени. - person Dawid Wysakowicz; 09.08.2018
comment
Понятно. Похоже, что это единственный способ использовать метод within () в случае, если на данный момент требуется несколько временных ограничений (Flink 1.5.2). Спасибо за комментарии, я попробую. - person Hayes Yang; 10.08.2018