Apache Flink: ProcessWindowFunction не применяется

Я хочу использовать ProcessWindowFunction в моем проекте Apache Flink. Но я получаю некоторую ошибку при использовании функции процесса, см. Фрагмент кода ниже.

Ошибка:

Процесс метода (ProcessWindowFunction, R, Tuple, TimeWindow>) в типе WindowedStream, Tuple, TimeWindow> не применим для аргументов (JDBCExample.MyProcessWindows)

Моя программа:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

My ProcessWindowFunction:

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}

person Kspace    schedule 20.03.2018    source источник
comment
Вы можете дважды проверить сообщение об ошибке? Кажется, что в сигнатуре метода process() чего-то не хватает.   -  person Fabian Hueske    schedule 20.03.2018


Ответы (2)


Проблема, вероятно, в общих типах ProcessWindowFunction.

Вы ссылаетесь на ключ по позиции (keyBy(0)). Следовательно, компилятор не может определить его тип (String), и вам необходимо изменить ProcessWindowFunction на:

private class MyProcessWindows 
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

Заменив String на Tuple, вы получите общий заполнитель для ключей, который можно преобразовать в Tuple1<String>, когда вам нужно получить доступ к ключу в методе processElement():

public void process(
    Tuple key, 
    Context context, 
    Iterable<Tuple2<String, JSONObject>> input, 
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

Вы можете избежать приведения и использовать правильный тип, если определите функцию KeySelector<IN, KEY> для извлечения ключа, потому что тип возврата KEY из KeySelector известен компилятору.

person Fabian Hueske    schedule 20.03.2018
comment
Спасибо за помощь @Fabian, я заменил String на Tuple, но все равно ошибка сохраняется. Нужно ли мне также определять функцию KeySelector. Это обязательно ?? Также простая оконная операция, такая как apply, также вызывает ту же ошибку, тогда как сокращение работает правильно. - person Kspace; 20.03.2018

То, что сказал Фабиан :) Использование Tuple должно работать, но требует некоторых уродливых приведений типов в вашем ProcessWindowFunction. Использовать KeySelector просто, и в результате код становится чище. Например.

.keyBy(new KeySelector<Tuple2<String,JsonObject>, String>() {

    @Override
    public String getKey(Tuple2<String, JsonObject> in) throws Exception {
        return in.f0;
    }
})

Вышеупомянутое позволяет вам определить ProcessWindowFunction, например:

public class MyProcessWindows extends ProcessWindowFunction<Tuple2<String, JsonObject>, Tuple2<String, String>, String, TimeWindow> {
person kkrugler    schedule 22.03.2018