Соединение Flink Scala между двумя потоками, похоже, не работает

Я хочу присоединиться к двум потокам (json), исходящим от производителя Kafka. Код работает, если я фильтрую данные. Но кажется, что это не работает, когда я присоединяюсь к ним. Я хочу вывести на консоль объединенный поток, но ничего не появляется. это мой код

import java.util.Properties 
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object App {

def main(args : Array[String]) {

case class Data(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)


case class Datas(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)


val properties = new Properties();
    properties.setProperty("bootstrap.servers", "0.0.0.0:9092");
    properties.setProperty("group.id", "test");

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
   val stream1 = env
   .addSource(consumer1)

   val consumer2 = new FlinkKafkaConsumer010[String]("topics2", new SimpleStringSchema(), properties)
   val stream2 = env
   .addSource(consumer2)

   val s1 = stream1.map { x => {
     implicit val formats = DefaultFormats
     JsonMethods.parse(x).extract[Sensor]
     }
   }
   val s2 = stream2.map { x => {
     implicit val formats = DefaultFormats
     JsonMethods.parse(x).extract[Sensor2]
     }
   }

  val s1t = s1.assignAscendingTimestamps { x => x.data.timestamp }
  val s2t = s2.assignAscendingTimestamps { x => x.data.timestamp }

  val j1pre = s1t.join(s2t)
              .where(_.data.unit)
              .equalTo(_.data.unit)
              .window(TumblingEventTimeWindows.of(Time.seconds(2L)))
              .apply((g, s) => (s.sensor_name, g.sensor_name, s.data.measurement))
   env.execute()

}

}

Я думаю, что проблема в назначении метки времени. Я думаю, что assignAscendingTimestamp в двух источниках - неправильная функция.

JSON, созданный производителем kafka, имеет поле data.timestamp, которое должно быть назначено в качестве метки времени. Но я не знаю, как этим управлять.

Я также подумал, что мне нужно дать пакет временного окна (как в искре) для входящих кортежей. Но я не уверен, что это правильное решение.


person lu_ferra    schedule 03.08.2017    source источник


Ответы (1)


Я думаю, что ваш код нуждается в незначительных корректировках. Прежде всего, поскольку вы хотите работать в EventTime, вы должны установить соответствующий TimeCharacteristic

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Также в вашем коде, который вы вставили, отсутствует приемник для потока. Если вы хотите печатать на консоль, вы должны:

j1pre.print

Остальная часть вашего кода выглядит нормально.

person Dawid Wysakowicz    schedule 03.08.2017
comment
Спасибо. Теперь это работает!!!! Я добавил TimeCharacteristic и теперь все в порядке! - person lu_ferra; 03.08.2017