Apache Flink - используйте значения из потока данных для динамического создания источника потоковых данных

Я пытаюсь создать образец приложения с помощью Apache Flink, которое выполняет следующие действия:

  1. Считывает поток биржевых символов (например, «CSCO», «FB») из очереди Kafka.
  2. Для каждого символа выполняется поиск текущих цен в реальном времени и потоковая передача значений для последующей обработки.

* Обновить исходное сообщение *

Я переместил функцию карты в отдельный класс и не получаю сообщение об ошибке времени выполнения «Реализация MapFunction больше не сериализуема. Объект, вероятно, содержит несериализуемые поля или ссылается на них».

Проблема, с которой я столкнулся сейчас, заключается в том, что тема Kafka "цены на акции", на которую я пытаюсь записать цены, не получает их. Я пытаюсь устранить неполадки и опубликую все обновления.

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", "localhost:9092"); 
        properties.setProperty("zookeeper.connect", "localhost:2181"); 
        properties.setProperty("group.id", "stocks"); 

        DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

        DataStream<String> stockPrice = 
            streamOfStockSymbols 
            //get unique keys 
            .keyBy(new KeySelector<String, String>() { 
                @Override 
                public String getKey(String trend) throws Exception {
                    return trend; 
                }
                }) 
            //collect events over a window 
            .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
            //return the last event from the window...all elements are the same "Symbol" 
            .apply(new WindowFunction<String, String, String, TimeWindow>() {
                @Override 
                public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
                    out.collect(input.iterator().next().toString()); 
                }
            })
            .map(new StockSymbolToPriceMapFunction());

        streamExecEnv.execute("Retrieve Stock Prices"); 
    }
}

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
    @Override
    public String map(String stockSymbol) throws Exception {
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);

        DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
        stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));

        return "100000";
    }

    private static class CustomKeySelector implements KeySelector<String, String> {
        @Override
        public String getKey(String arg0) throws Exception {
            return arg0.trim();
        }
    }
}


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
            stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
            isRunning = true; 
    } 


    @Override 
    public void cancel() { 
            isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
                    throws Exception { 
            String stockPrice = "0";
            while (isRunning) { 
                //TODO: query Google Finance API 
                stockPrice = Integer.toString((new Random()).nextInt(100)+1);
                ctx.collect(stockPrice);
                Thread.sleep(10000);
            } 
    } 
}

person Hari N    schedule 18.12.2016    source источник


Ответы (1)


StreamExecutionEnvironment не имеют отступа для использования внутри операторов потокового приложения. Не предназначенные средства, это не проверяется и не поощряется. Он может работать и что-то делать, но, скорее всего, он не будет вести себя должным образом и, вероятно, убьет ваше приложение.

StockSymbolToPriceMapFunction в вашей программе определяет для каждой входящей записи совершенно новое и независимое новое потоковое приложение. Однако, поскольку вы не вызываете streamExecEnv.execute(), программы не запускаются, и метод map возвращается без каких-либо действий.

Если вы вызовете streamExecEnv.execute(), функция запустит новый локальный кластер Flink в рабочей JVM и запустит приложение в этом локальном кластере Flink. Локальный экземпляр Flink займет много места в куче, и после запуска нескольких кластеров рабочий, вероятно, умрет из-за OutOfMemoryError, чего вы не хотите.

person Fabian Hueske    schedule 19.12.2016
comment
Возможно ли вообще динамически создавать потоки в ответ на поступающие данные? - person Lawrence Wagerfield; 01.01.2017
comment
Вы можете реализовать FlatMapFunction, который динамически считывает и передает данные на основе поступающих записей. Например, если у вас есть поток с именами файлов, a FlatMapFunction вы можете открыть эти файлы и передать их данные. Однако типы вывода всех записей должны быть одинаковыми. Кроме того, может быть сложно получить правильную семантику обработки во время событий, но это более общая проблема динамически добавляемых источников. - person Fabian Hueske; 03.01.2017
comment
@FabianHueske Я решаю аналогичный вариант использования. Поэтому, если мне нужно использовать FlatMapFunction, тогда нам придется читать файл, используя обычный File API из scala / Java, а не используя readTextFile Flink. Причина в том, что мы не можем использовать StreamExecutionEnvironment внутри flatMap. Я правильно понимаю? - person Vignesh I; 17.12.2017