Богатая функция при присоединении к Flink, Scala API

Я борюсь с Flink и Scala.

У меня есть преобразование соединения по DataSet, которое в значительной степени работает, но я хочу превратить его в RichFuntion, чтобы я мог получить доступ к широковещательному набору:

val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
                             .where("coords").equalTo("cellCoords"){

    (cell, neighbours) => {
            // Do some rich function things, like 
            // override the open method so I can get
            // the broadcasted set
    }

  }

}.withBroadcastSet(board, "aliveCells")

Я просмотрел всю документацию, но не нашел ни одного примера использования RichJoinFuntion в Scala. Я нахожу только примеры для расширенных функций, используемых в map или filter, но синтаксис отличается для преобразования join (функция между скобками и между скобками).


person houcros    schedule 30.04.2016    source источник


Ответы (1)


Вы можете использовать RichJoinFunction с API Scala DataSet следующим образом

val newBoard: DataSet[Cell] = board.rightOuterJoin(neighbours)
                             .where("coords").equalTo("cellCoords")
                               .apply(new YourJoinFunction())
                               .withBroadcastSet(board, "aliveCells")

class YourJoinFunction extends RichJoinFunction[IN1, IN2, Cell] {
  override def join(first: IN1, second: IN2): Cell = {
    // Do some rich function things, like 
    // override the open method so I can get
    // the broadcasted set
  }
}
person Fabian Hueske    schedule 30.04.2016