Исключение NullPointer при попытке доступа или чтения ReadOnly ctx в методе processElement в KeyedBroadCastProcessFunction в Apache Flink

У меня есть интересный сценарий, в котором я работаю над сопоставлением шаблонов во flink, оценивая входящие шаблоны с помощью функции keyedbroadcastprocess, когда я запускаю программу в среде IDE, я получаю исключение нулевого указателя в функции processElements при попытке доступа к ReadOnlyContext, но в терминале все работает нормально, ниже моя функция keyedbroadcastprocess

Ниже мой основной класс

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.json.JSONObject;

public class SignalPatternMatchingApp {

    public static final MapStateDescriptor<String, Map<String, String>> patternRuleDescriptor =
            new MapStateDescriptor(SignalPatternMatchingConstants.PATTERN_RULE_DESCRIPTOR_NAME,
                    BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo<>(String.class, String.class));

 
    public static final OutputTag<JSONObject> unMatchedSideOutput =
            new OutputTag<JSONObject>("sideoutput") {
            };

 
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        
        DataStream<JSONObject> inputSignal = get input from kafka stream

        DataStream<Map<String, String>> rawPatternStream =
                env.fromElements(get data from database);

        DataStream<Tuple2<String, Map<String, String>>> patternStream =
                rawPatternStream.flatMap(new FlatMapFunction<Map<String, String>,
                        Tuple2<String, Map<String, String>>>() {
                    @Override
                    public void flatMap(Map<String, String> patternRules,
                                        Collector<Tuple2<String, Map<String, String>>> out) throws Exception {
                        for (Map.Entry<String, String> stringEntry : patternRules.entrySet()) {
                            JSONObject jsonObject = new JSONObject(stringEntry.getValue());
                            Map<String, String> map = new HashMap<>();
                            for (String key : jsonObject.keySet()) {
                                String value = jsonObject.get(key).toString();
                                map.put(key, value);
                            }
                            out.collect(new Tuple2<>(stringEntry.getKey(), map));
                        }
                    }
                });

        BroadcastStream<Tuple2<String, Map<String, String>>> patternBroadcast =
                patternStream.broadcast(patternRuleDescriptor);


        DataStream<Tuple2<String, JSONObject>> matchedSignal =
                inputSignal.map(new MapFunction<JSONObject, Tuple2<String, JSONObject>>() {
            @Override
            public Tuple2<String, JSONObject> map(JSONObject inputSignal) throws Exception {
                String sourceName = inputSignal.getJSONObject("signalHeader").get("sourceName").toString();
                return new Tuple2<>(sourceName, inputSignal);
            }
        }).keyBy(0).connect(patternBroadcast).process(new TestProcess());


        matchedSignal.print();
        
        DataStream<JSONObject> unmatchedSignal =
                ((SingleOutputStreamOperator<Tuple2<String, JSONObject>>) matchedSignal)
                .getSideOutput(unMatchedSideOutput);

        unmatchedSignal.print();

        env.execute();

    }
    

KeyedBroadcastProcessFunction as below




 public class TestProcess extends KeyedBroadcastProcessFunction<String, Tuple2<String, sampleSignal>,
            Tuple2<String, Map<String, String>>, Tuple2<String, sampleSignal>> {

   MapStateDescriptor<String, Map<String, String>> patternRuleDesc;

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        patternRuleDesc = new MapStateDescriptor<>("RuleDescriptor",
                BasicTypeInfo.STRING_TYPE_INFO,new MapTypeInfo<>(String.class,String.class));
    }
    
        public static final MapStateDescriptor <String,Map<String,String>> ruleDescriptor =
                new MapStateDescriptor <>("RuleDiscriptor",
                        ,BasicTypeInfo.STRING_TYPE_INFO
                        ,new MapTypeInfo<>(String.class,String.class));
    
        @Override
        public void processElement(Tuple2<String, sampleSignal> value, ReadOnlyContext ctx, Collector<Tuple2<String,
                sampleSignal>> out) throws Exception {
  
            Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);
    
            System.out.println("Before Rule Iterator");
            
            /*I tried below way to print the values in broadcaststream just to print the values
              in broadcast state it don't print anything*/
              
            for(Map.Entry<String, String> rules:
                    patternConditions.entrySet()){
                System.out.println("Key: " +rules.getKey());
                System.out.println("Value: "+rules.getValue());
            }
    
            out.collect(new Tuple2<>(value.f0,value.f1));
    
        }
    
        @Override
        public void processBroadcastElement(Tuple2<String, Map<String, String>> value, Context ctx,
                                            Collector<Tuple2<String, sampleSignal>> out) throws Exception {
    
            System.out.println("BroadCastState Key: " +value.f0);
            System.out.println("BroadCastState Value: " +value.f1);
            ctx.getBroadcastState(ruleDescriptor).put(value.f0,value.f1);
    
        }
    }

Ниже приведены выходные данные терминала IDE с исключением ошибок.

 2020-07-07 12:15:19,349 INFO  [Task] - Ensuring all FileSystem streams are closed for task Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) [FINISHED]
2020-07-07 12:15:19,348 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,350 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,341 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Collection Source (1/1) 23f6a4d19c5744a62c46ac48d4dfbb24.
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5).
2020-07-07 12:15:19,351 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,351 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3).
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState SourceName: A
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) [FINISHED]
2020-07-07 12:15:19,351 INFO  [TestProcess ] - BroadCastState PatternCondition: PatternRule
2020-07-07 12:15:19,351 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) [FINISHED]
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd).
2020-07-07 12:15:19,352 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) [FINISHED]
2020-07-07 12:15:19,357 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,357 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b).
2020-07-07 12:15:19,357 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) [FINISHED]
2020-07-07 12:15:19,358 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9).
2020-07-07 12:15:19,339 INFO  [ExecutionGraph] - Flat Map (3/8) (63a7c3f2b7a8d110232374c700e6378a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,352 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (3/8) d94aea75380e0e32c4747eef2f51a88d.
2020-07-07 12:15:19,358 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4).
2020-07-07 12:15:19,359 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (2/8) 32f2429b06954884543a4de062edf6f6.
2020-07-07 12:15:19,358 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) [FINISHED]
2020-07-07 12:15:19,360 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (1/8) 7dd6a325fe2265187b0b30bf3b4b8f63.
2020-07-07 12:15:19,358 INFO  [ExecutionGraph] - Flat Map (4/8) (de310509f095e00584ce128336e19adf) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,361 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Map (5/8) 4920b55f28ea09128aaa4e0d9d4691d8.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (8/8) 0fded4a7816a9d9a219c520891d2b38d.
2020-07-07 12:15:19,362 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Flat Map (1/8) 1a117aa5347465fcc2cd5e58c286ccca.
2020-07-07 12:15:19,363 INFO  [ExecutionGraph] - Map (7/8) (ac26eefbff14e55f031a055c1d6fa7f3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,364 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) 67a21177a7598f3f1ccc5001fe6951c3.
2020-07-07 12:15:19,367 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) 773fad58ff1418ae919a0900e4da6ef5.
2020-07-07 12:15:19,367 INFO  [ExecutionGraph] - Flat Map (5/8) (3bb74ec008b43ffe86edd6a7f84844a0) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,368 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) 8d860a734d5b6a50194a5caad135a844.
2020-07-07 12:15:19,368 INFO  [ExecutionGraph] - Map (2/8) (90787449e2373696163da5670b0e543a) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,369 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) 8fa9c71df79d439827a1d290d9ad9abd.
2020-07-07 12:15:19,370 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) ee3c4e895e82f77ad9a055ee788f4a2b.
2020-07-07 12:15:19,373 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) 351cc238fecce28d1dfdc5ac357ef8e4.
2020-07-07 12:15:19,374 INFO  [TaskExecutor] - Un-registering task and sending final execution state FINISHED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) 6a75da55421a929d4b3d4ebd655414e9.
2020-07-07 12:15:19,369 INFO  [ExecutionGraph] - Flat Map (7/8) (c1ca47240e15ef6c60f1943aed1b45ba) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,379 INFO  [ExecutionGraph] - Source: Collection Source (1/1) (23f6a4d19c5744a62c46ac48d4dfbb24) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,381 INFO  [ExecutionGraph] - Map (3/8) (d94aea75380e0e32c4747eef2f51a88d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,382 INFO  [ExecutionGraph] - Flat Map (2/8) (32f2429b06954884543a4de062edf6f6) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,384 INFO  [ExecutionGraph] - Map (1/8) (7dd6a325fe2265187b0b30bf3b4b8f63) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,385 INFO  [ExecutionGraph] - Map (5/8) (4920b55f28ea09128aaa4e0d9d4691d8) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a getter for field map
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - class org.json.JSONObject does not contain a setter for field map
2020-07-07 12:15:19,386 INFO  [ExecutionGraph] - Flat Map (8/8) (0fded4a7816a9d9a219c520891d2b38d) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,386 INFO  [TypeExtractor] - Class class org.json.JSONObject cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2020-07-07 12:15:19,388 INFO  [ExecutionGraph] - Flat Map (1/8) (1a117aa5347465fcc2cd5e58c286ccca) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (2/8) (67a21177a7598f3f1ccc5001fe6951c3) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,390 INFO  [Task] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
    at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:103)
    at com.eventdetection.eventfilter.pattern.operator.json.filter.TestProcess .processElement(TestProcess .java:40)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,394 INFO  [Task] - Freeing task resources for Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc).
2020-07-07 12:15:19,394 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (8/8) (773fad58ff1418ae919a0900e4da6ef5) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,394 INFO  [Task] - Ensuring all FileSystem streams are closed for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) [FAILED]
2020-07-07 12:15:19,395 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (3/8) (8d860a734d5b6a50194a5caad135a844) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,396 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (1/8) (8fa9c71df79d439827a1d290d9ad9abd) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,397 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (4/8) (ee3c4e895e82f77ad9a055ee788f4a2b) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,398 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (7/8) (351cc238fecce28d1dfdc5ac357ef8e4) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,399 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (6/8) (6a75da55421a929d4b3d4ebd655414e9) switched from RUNNING to FINISHED.
2020-07-07 12:15:19,402 INFO  [TaskExecutor] - Un-registering task and sending final execution state FAILED to JobManager for task Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) 33a27b2d97c96658fd43db24177236bc.
2020-07-07 12:15:19,404 INFO  [ExecutionGraph] - Co-Process-Broadcast-Keyed -> (Sink: Print to Std. Out, Sink: Print to Std. Out) (5/8) (33a27b2d97c96658fd43db24177236bc) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
    at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:103)
    at com.eventdetection.eventfilter.pattern.operator.json.filter.PatternFilter.processElement(PatternFilter.java:40)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:113)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:135)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:100)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:182)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-07-07 12:15:19,406 INFO  [RestartPipelinedRegionStrategy] - Calculating tasks to restart to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4.
2020-07-07 12:15:19,408 INFO  [RestartPipelinedRegionStrategy] - 26 tasks should be restarted to recover the failed task d8804397962a5c1c0b4daacb1802fb97_4. 
2020-07-07 12:15:19,410 INFO  [ExecutionGraph] - Job Pattern-Matching (1a828d53bc6a886fe0fc7c454e6e66b7) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Пожалуйста, помогите в решении проблемы, я использую версию FLINK 1.10.0 и INTELLiJ IDE и Java версии 1.8.


person YRK    schedule 15.06.2020    source источник
comment
На StreamTwoInputProcessor.processInput также есть нулевая точка. Я предполагаю, что вы пытаетесь обработать какое-то событие, которое является нулевым. Но другой нулевой указатель на incomingRule.size() это действительно странно.   -  person Felipe    schedule 15.06.2020
comment
@Felipe, спасибо за ответ, я пропустил еще одну ошибку. Я получаю следующую ошибку, но я использую тот же дескриптор mapstate и использую его для трансляции потока. Причина: java.lang.IllegalArgumentException: запрошенное состояние не существует. Проверьте наличие опечаток в дескрипторе состояния или укажите дескриптор состояния в вызове datastream.broadcast (...), если вы забыли его зарегистрировать.   -  person YRK    schedule 15.06.2020
comment
Думаю, проблема может заключаться в том, как вы создаете экземпляр MapStateDescriptor. Вы должны создать его экземпляр внутри открытого метода, который вам нужно перегрузить из RichFunction. Вот один пример: ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/   -  person Felipe    schedule 15.06.2020
comment
Да, я действительно не думаю, что ошибка возникает там, где Вы думаете. Что такое incomingPattern и Key?   -  person Dominik Wosiński    schedule 15.06.2020
comment
@Felipe ya даже пробовал этот подход, тоже объявляя открытый метод и инициируя Mapstate, но все же я вижу такое же исключение нулевого указателя   -  person YRK    schedule 16.06.2020
comment
@ DominikWosiński incomingPattern - это правило для определенного источника, который является не чем иным, как ключом ... я пытаюсь читать только правила конкретного источника на основе ключа вместо повторения через состояние широковещания и применения всех правил в потоке, я создаю карту правил, основанных на имени источника и перебирающих только эти значения   -  person YRK    schedule 16.06.2020
comment
@Felipe любое предложение здесь, что я делаю неправильно   -  person YRK    schedule 18.06.2020
comment
Точно не знаю. Ваш пример очень сбивает с толку, и ошибка не соответствует вашему коду. Таким образом, помочь трудно (по крайней мере, мне). Я предлагаю вам начать с Hello World рабочего примера, подобного этому (flink.apache .org / 2019/06/26 / broadcast-state.html) и пошагово улучшайте, чтобы увидеть, где это не работает.   -  person Felipe    schedule 18.06.2020
comment
@ Дэвид Андерсон, не могли бы вы прокомментировать это   -  person YRK    schedule 07.07.2020
comment
Я согласен с Фелипе здесь, в коде, который вы представляете, есть несколько несоответствий. Например, что такое patternRuleDesc? Он не был объявлен нигде в классе, но назначен в методе open.   -  person Dominik Wosiński    schedule 09.07.2020
comment
@ DominikWosiński patternRuleDesc - это дескриптор состояния карты, извините, я не упомянул об этом в коде   -  person YRK    schedule 09.07.2020


Ответы (1)


Единственное место, где он действительно может выйти из строя, если предположить, что текущее состояние кода - это эта часть:

 Map<String,String> patternConditions  = ctx.getBroadcastState(this.patternRuleDesc).get(Key);
    
            System.out.println("Before Rule Iterator");
            
            /*I tried below way to print the values in broadcaststream just to print the values
              in broadcast state it don't print anything*/
              
            for(Map.Entry<String, String> rules:
                    patternConditions.entrySet()){
                System.out.println("Key: " +rules.getKey());
                System.out.println("Value: "+rules.getValue());
            }

Проблема на самом деле не в получении состояния широковещательной передачи, поскольку оно никогда не должно вызывать NullPointerException, а скорее IllegalArgumentException. Проблема в том, что вы получаете Key, а затем пытаетесь выполнить какую-то операцию с этим (.entryset()). Но если заданный Key отсутствует, состояние будет работать как нормальное Map, что означает возврат null. И если Вы попробуете проделать операцию на null, Вы получите NullPointerException.

Вы должны добавить код, чтобы убедиться, что данный Key находится в состоянии, иначе он всегда может выйти из строя.

РЕДАКТИРОВАТЬ:

Итак, если вопрос заключается в гонке между BroadcastStream и другим потоком, это не то, что Вы можете легко предотвратить. Это связано с тем, что скорость, с которой потребители просматривают сообщения, может быть разной, и обычно невозможно предотвратить это. Есть два способа решить эту проблему:

  1. Создайте ListState, который сохранит элементы, которые прибыли и не имеют соответствующего Key, тогда вы просто испускаете их всякий раз, когда приходит заданный Key. Примечание: вы должны испускать их из processElement, поскольку BroadcastState не имеет ключа.
  2. Используйте InputSelectable, чтобы написать оператор, который предпочитает один источник другому, таким образом вы можете предпочесть Broadcast Stream потоку элементов.

Как правило, я бы сказал, что вариант 1 проще и быстрее внедрить, если у вас есть непрерывный поток данных, т.е. вы знаете, что заданный Key будет приходить более одного раза. Вариант 2 дает больше гибкости, но также требует дополнительных знаний и, как правило, труднее реализовать правильно.

person Dominik Wosiński    schedule 09.07.2020
comment
Я согласен с тем, что ключ присутствует, и из-за этого он бросает NPE, что я наблюдал, это широковещательный поток и поток данных, которые соревнуются друг с другом, и это исключение не согласуется, иногда оно не бросает NPE и работает нормально, но иногда оно выдает NPE что я наблюдал при запуске программы в режиме отладки для успешного запуска первый метод широковещательной передачи в KeyedBroadCastProcessFn выполняется первым в случае отказа первый метод processElement выполняется из-за того, что состояние карты пусто, я ищу подсказку, как я могу убедиться, что первое состояние широковещательной передачи казнен первым - person YRK; 09.07.2020
comment
Ах, я думаю, Вам, возможно, стоит пояснить вопрос, потому что, по крайней мере, для меня не ясно, о чем Вы спрашиваете :) Отредактированный ответ - person Dominik Wosiński; 09.07.2020
comment
вы сказали Отредактированный ответ, извините, я не понимаю, что обновлено в ответе, не могли бы вы указать на это, и я согласен с вами, вопрос не ясен изначально, когда я столкнулся с этой проблемой, хотя я делаю что-то не так, но после большой отладки Я понял, что метод processElement выполняется перед методом широковещательной передачи из-за того, что состояние карты пусто и бросает NPE, я бы обновил вопрос, извините за это - person YRK; 09.07.2020
comment
Да, извините, у меня возникла проблема, и ответ был добавлен неправильно :) - person Dominik Wosiński; 09.07.2020
comment
спасибо за предложение, но, к сожалению, я не могу использовать состояние списка в моем случае, ключ - это не что иное, как источник, из которого поступают данные, и я получаю его как часть сигнала, и я использую функцию плоской карты для извлечения ключа, поэтому Я не сталкиваюсь с NPE с пустым ключом, и InputSelectable не подходит в моем случае, и я выяснил, что является проблемой: входящий сигнал, который я получаю, - это сигнал avro, и я читаю шаблоны из файла JSON, и из-за этого сигнал avro сначала обрабатывается по сравнению с потоком с jsonobject, поэтому я изменил avro на jsonobject - person YRK; 13.07.2020