Что означает, что состояние широковещания разблокирует реализацию функции «динамических шаблонов» для библиотеки Flink CEP?

Из объявления о выпуске Flink 1.5 мы знаем, что Flink теперь поддерживает «широковещательное состояние», и было описано, что «широковещательное состояние разблокирует реализацию функции« динамических шаблонов »для библиотеки Flink CEP».

Означает ли это, что в настоящее время мы можем использовать «состояние широковещания» для реализации «динамических шаблонов» без Flink CEP? Также я понятия не имею, в чем разница при реализации «динамических шаблонов» для Flink CEP с состоянием широковещания или без него? Я был бы признателен, если бы кто-нибудь мог привести пример с кодом, чтобы объяснить разницу.

=============

Обновление для тестирования широковещательного потока данных оператором broadcast () с ключевыми данными

После тестирования в Flink 1.4.2 я обнаружил, что широковещательный поток данных (старый оператор broadcast ()) может подключаться к ключевому потоку данных, ниже - тестовый код, и мы обнаружили, что все события потока управления транслируются на все экземпляры оператора. Таким образом, похоже, что старая трансляция () может выполнять те же функции, что и новое "состояние широковещания".

public static void ConnectBroadToKeyedStream() throws Exception {
    StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);

    List<Tuple1<String>>
            controlData = new ArrayList<Tuple1<String>>();
    controlData.add(new Tuple1<String>("DROP"));
    controlData.add(new Tuple1<String>("IGNORE"));
    DataStream<Tuple1<String>> control = env.fromCollection(controlData);//.keyBy(0);

    List<Tuple1<String>>
            dataStreamData = new ArrayList<Tuple1<String>>();
    dataStreamData.add(new Tuple1<String>("data"));
    dataStreamData.add(new Tuple1<String>("DROP"));
    dataStreamData.add(new Tuple1<String>("artisans"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));
    dataStreamData.add(new Tuple1<String>("IGNORE"));

    // DataStream<String> data2 = env.fromElements("data", "DROP", "artisans", "IGNORE");
    DataStream<Tuple1<String>> keyedDataStream = env.fromCollection(dataStreamData).keyBy(0);

    DataStream<String> result = control
            .broadcast()
            .connect(keyedDataStream)
            .flatMap(new MyCoFlatMap());
    result.print();
    env.execute();
}

private static final class MyCoFlatMap
        implements CoFlatMapFunction<Tuple1<String>, Tuple1<String>, String> {
    HashSet blacklist = new HashSet();

    @Override
    public void flatMap1(Tuple1<String> control_value, Collector<String> out) {
        blacklist.add(control_value);
        out.collect("listed " + control_value);
    }

    @Override
    public void flatMap2(Tuple1<String> data_value, Collector<String> out) {

        if (blacklist.contains(data_value)) {
            out.collect("skipped " + data_value);
        } else {
            out.collect("passed " + data_value);
        }
    }
}

Ниже представлен результат теста.

1> passed (data)
1> passed (DROP)
3> passed (artisans)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> passed (IGNORE)
3> listed (DROP)
3> listed (IGNORE)
1> listed (DROP)
1> listed (IGNORE)
2> listed (DROP)
2> listed (IGNORE)

https://data-artisans.com/blog/apache-flink-1-5-0-release-announcement


person YuFeng Shen    schedule 26.05.2018    source источник


Ответы (2)


Без состояния широковещания два потока данных Flink не могут обрабатываться вместе с сохранением состояния, если они не имеют одинаковой клавиатуры. Широковещательный поток можно подключить к потоку с ключом, но если вы затем попытаетесь использовать состояние с ключом, например, в RichCoFlatMap, это не удастся.

Часто желательно иметь один поток с динамическими «правилами», которые должны применяться к каждому событию в другом потоке, независимо от ключа. Требовался новый вид управляемого состояния Flink, в котором можно было бы хранить эти правила. С широковещательным статусом это теперь можно сделать простым способом.

Теперь, когда эта функция внедрена, можно начинать работу над поддержкой динамических шаблонов в CEP.

person David Anderson    schedule 26.05.2018
comment
Оператор dataStream.broadcast () не может подключиться к ключевому DataStream? Если можно, в чем разница между использованием состояния широковещательной передачи и оператора широковещательной передачи ()? Похоже, оба они имеют одинаковый эффект. - person YuFeng Shen; 27.05.2018
comment
Таким образом, чтобы быть более точным, 1) широковещательный поток не может быть потоком с ключом до Flink1.5, однако из широковещательного потока Flink1.5 может использоваться поток с ключом по состоянию широковещания, 2) даже до Flink 1.5 широковещательный поток все еще может подключаться к не- поток с ключом, 3) поток с ключом не может подключиться к другому потоку без ключа, 4) однако поток с ключом может подключаться к другому потоку с ключом. Пожалуйста, поправьте меня, если какой-либо из 4 элементов неверен. - person YuFeng Shen; 27.05.2018
comment
Я провел тест и обнаружил, что могу успешно подключить () широковещательный поток и поток с ключом, и обновил тестовый код до исходного вопроса, пожалуйста, проверьте его. - person YuFeng Shen; 28.05.2018
comment
Да, я ошибся. Можно подключить широковещательный поток и поток с ключом. Что нельзя сделать, так это использовать состояние с ключом в чем-то вроде RichCoFlatMap для хранения динамических правил. - person David Anderson; 28.05.2018
comment
Не могли бы вы помочь ответить на этот связанный вопрос stackoverflow.com/questions/50570605/? - person YuFeng Shen; 28.05.2018
comment
Кстати, в примере, который я привел выше, не учитывается ли черный список HashSet для хранения динамических правил в потоке с ключом? - person YuFeng Shen; 28.05.2018
comment
HashSet в этом примере не подходит, потому что для него не будет контрольной точки, и, следовательно, приложение не будет отказоустойчивым, и его нельзя будет масштабировать. - person David Anderson; 28.05.2018
comment
Если это так, как насчет использования HashSet в этом примере с CheckpointedFunction? поэтому он будет отказоустойчивым, его можно будет масштабировать и достичь того же эффекта, что и состояние широковещания, так что это альтернатива состоянию широковещания, вы так думаете? - person YuFeng Shen; 30.05.2018

Вот пример кода, который реализует как исходный метод широковещательной рассылки flink без аргументов, так и недавно введенное состояние широковещательной передачи на flink 1.5.0. https://gist.github.com/syhily/932e0d1e0f12b3e951236d7c36e5

Насколько я узнал, состояние широковещания может быть реализовано без flink cep, как и в приведенном выше коде.

Исходный метод DataStream broadcast создавал бы DataStream вместо BroadcastConnectedStream. Это будет исходная схема дизайна coGroup. Мы могли бы использовать больше функций преобразования потока, определенных в ConnectedStreams, после соединения потока метрик с транслируемым потоком правил. Например, функция keyBy, при этом транслируемый поток и подключенный поток, которые имеют одинаковый ключ, будут process привязаны и закреплены на одном параллельном CoProcessFunction. Таким образом, CoProcessFunction может иметь собственное локальное хранилище. Функция процесса может иметь в своем поле настраиваемую структуру данных, отличную от состояния карты, доступного из ReadOnlyContext.

Состояние широковещательной передачи может быть реализовано методом broadcast с набором MapStateDescriptor, это означает, что транслируемый поток может быть связан с другим потоком много раз. Различные подключенные BroadcastConnectedStream могут совместно использовать свое собственное состояние широковещательной передачи с помощью уникальной MapStateDescriptor в process функции.

Я думал, что это будут ключевые различия между трансляцией с включенными аргументами и состоянием трансляции.

person 盛雨帆    schedule 06.07.2018