Вызов внешнего скрипта из Flink

Некоторым из вас это может показаться очень сложной проблемой. Я хочу использовать Apache Flink для применения некоторых алгоритмов к данным из SocketStream. Однако эти алгоритмы являются внешними исполняемыми файлами, которые я запускаю с помощью пакета Scala sys.process. Вот что я хочу, чтобы Flink сделал:

  1. Получить отдельные строки из SocketStream:

    val text = env.socketTextStream(hostName, port) val lines = text.flatMap { _.toLowerCase.split("\\n") filter { _.nonEmpty } }

  2. Вызовите мой исполняемый алгоритм с этими строками в качестве параметров командной строки. Примерно так:

    var op = "./Somefile.py "+lines!

  3. Распечатайте вывод, который я получаю от исполняемого файла.

    op.print()

Очевидно, что это неправильный способ сделать то, что я пытаюсь сделать, поскольку op в отличие от lines не является приемником данных, и поэтому ничего не печатается. Есть ли способ добиться этого?


person Piyush Shrivastava    schedule 16.03.2016    source источник


Ответы (1)


Если вы поместите все аргументы в одно строковое значение, вы можете вызвать внешний исполняемый файл из файла MapFunction.

Это будет выглядеть так:

val args: DataStream[String] = env.socketTextStream(hostName, port) 
// assume each text line has all elements
val out: DataStream[String] = args.map(new ExternalCaller())
// print result
out.print()

с

class ExternalCaller extends MapFunction[String, String] {

  override def map(args: String): String = {
    // call external executable with args here and return output
  }
}
person Fabian Hueske    schedule 16.03.2016
comment
Я подумал, что это можно сделать с помощью map, но это решило мою проблему. Спасибо. - person Piyush Shrivastava; 16.03.2016