Flink CEP — поднимать предупреждение, если последовательность не пришла в течение определенного времени, независимо от того, поступило ли какое-либо другое сообщение.

Я написал часть Flink CEP, которая проверяет шаблон статуса (с ключом id) с Relaxed Contiguity (followedBy). Идея состоит в том, чтобы поднять предупреждение, если определенный статус не прибыл после первого в течение заданного времени.

Это работает, однако, если в этом потоке больше нет сообщений, оповещение никогда не срабатывает. Но только когда приходит сообщение со случайным статусом, эта часть срабатывает.

Итак, как мне заставить его вызывать оповещение, даже если в этот поток не поступает сообщение, когда сообщение со следующей последовательностью не пришло со временем?

Pattern<Transaction, Transaction> pattern = Pattern.<Transaction>begin("start")
            .where(new SimpleCondition<Transaction>() {

                private static final long serialVersionUID = 1L;

                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_1);
                }
            })
           .followedBy("end")
           .where(new SimpleCondition<Transaction>() {
                @Override
                public boolean filter(Transaction value) throws Exception {
                    return value.getStatus().equals(Status.STATUS_2);
                    return amlveri;
                }
            }).within(Time.seconds(15));

PatternStream<Transaction> patternStream = CEP.pattern(dataStreamSource, pattern);
OutputTag<Alert> timedOutPartialMatchesTag = new OutputTag<Alert>("alert",
            TypeInformation.of(Alert.class)) {};
    SingleOutputStreamOperator<Alert> select = patternStream.flatSelect(timedOutPartialMatchesTag,
            new PatternFlatTimeoutFunction<Transaction, Alert>() {

                @Override
                public void timeout(Map<String, List<Transaction>> values, long arg1, Collector<Alert> arg2)
                        throws Exception {
                    Transaction failedTrans = values.get("start").get(0);
                    arg2.collect(new Alert("status_2 didnt arrive in time, ", failedTrans));
                }
            }, new PatternFlatSelectFunction<Transaction, Alert>() {

                @Override
                public void flatSelect(Map<String, List<Transaction>> arg0, Collector<Alert> arg1)
                        throws Exception {
                       // do not do anything
                }
            });
    select.getSideOutput(timedOutPartialMatchesTag).print();

person Vijay Veeraraghavan    schedule 17.01.2020    source источник


Ответы (1)


Если вы работаете со временем события, то метод within() ожидает появления водяного знака, чтобы запустить таймер времени события. Но без поступления событий водяной знак не будет продвигаться вперед (при условии, что вы используете что-то вроде BoundedOutOfOrdernessTimestampExtractor для создания водяных знаков).

Если вам нужно определить течение времени, пока не поступают события, необходимо использовать время обработки. Вы можете установить TimeCharacteristic на время обработки или реализовать генератор водяных знаков, который использует таймеры времени обработки для искусственного продвижения водяного знака, несмотря на отсутствие событий.

person David Anderson    schedule 17.01.2020
comment
Такой вариант использования, который требует таймеров, должен использовать таймер обработки, а не api cep. Спасибо @David, я опубликую код здесь для справки. - person Vijay Veeraraghavan; 20.01.2020