Прогнозирование потоковой передачи в реальном времени во Flink с использованием scala

Версия Flink: 1.2.0
Версия Scala: 2.11.8

Я хочу использовать DataStream для прогнозирования использования модели во флинке с использованием scala. У меня есть DataStream [String] в flink с использованием scala, который содержит данные в формате json из источника kafka. Я хочу использовать этот поток данных для прогнозирования модели Flink-ml, которая уже обучена. Проблема в том, что все примеры flink-ml используют DataSet api для прогнозирования. Я относительно новичок в flink и scala, поэтому любая помощь в виде решения для кода будет оценена.

Вход :

{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"}

Код:

package org.apache.flink.quickstart

//imports

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.ml.recommendation.ALS
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.parsing.json.JSON

//kafka consumer imports
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

//kafka json table imports
import org.apache.flink.table.examples.scala.StreamTableExample
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource
import org.apache.flink.api.java.DataSet

//JSon4s imports
import org.json4s.native.JsonMethods



// Case class
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)


object WordCount {

  implicit val formats = org.json4s.DefaultFormats

  def main(args: Array[String]) {

    // set up the execution environment
    implicit lazy val formats = org.json4s.DefaultFormats

    // kafka properties
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")
    properties.setProperty("group.id","grouop")
    properties.setProperty("auto.offset.reset", "earliest")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val st = env
      .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))
      .flatMap(raw => JsonMethods.parse(raw).toOption)


    val mapped = st.map(_.extract[CC])

    mapped.print()

    env.execute()

    }
}

person Rolf Lobo    schedule 03.05.2017    source источник
comment
stackoverflow.com/questions/59032147/ Ребята, есть идеи по этому поводу. Пожалуйста, предоставьте решение, если возможно   -  person Akcs004    schedule 26.11.2019


Ответы (1)


Решить эту проблему можно путем написания MapFunction, который считывает модель при запуске задания. MapFunction затем сохранит модель как часть своего внутреннего состояния. Таким образом он будет автоматически восстановлен в случае сбоя:

public static void main(String[] args) throws Exception {
        // obtain execution environment, run this example in "ingestion time"
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<Value> input = ...; // read from Kafka for example

        DataStream<Prediction> prediction = input.map(new Predictor());

        prediction.print();

        env.execute();
    }

    public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {

        private transient ListState<Model> modelState;

        private transient Model model;

        @Override
        public Prediction map(Value value) throws Exception {
            return model.predict(value);
        }

        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // we don't have to do anything here because we assume the model to be constant
        }

        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);

            modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);

            if (context.isRestored()) {
                // restore the model from state
                model = modelState.get().iterator().next();
            } else {
                modelState.clear();

                // read the model from somewhere, e.g. read from a file
                model = ...;

                // update the modelState so that it is checkpointed from now
                modelState.add(model);
            }
        }
    }

    public static class Model {}

    public static class Value{}

    public static class Prediction{}
}
person Till Rohrmann    schedule 11.05.2017