我想在我的 Apache Flink 项目中使用 ProcessWindowFunction。但是我在使用过程函数时遇到一些错误,请参阅下面的代码片段

错误是:



我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());

我的 ProcessWindowFunction :
private class MyProcessWindows
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key,
      Context context,
      Iterable<Tuple2<String, JSONObject>> input,
      Collector<Tuple2<String, String>> out) throws Exception
  {
    ...
  }

}

最佳答案

问题可能是 ProcessWindowFunction 的泛型类型。

您正在按位置 ( keyBy(0) ) 引用键。因此,编译器无法推断其类型 ( String ),您需要将 ProcessWindowFunction 更改为:

private class MyProcessWindows
    extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>

通过将 String 替换为 Tuple,您现在拥有一个通用的键占位符,当您需要访问 Tuple1<String> 方法中的键时,您可以将其转换为 processElement():
public void process(
    Tuple key,
    Context context,
    Iterable<Tuple2<String, JSONObject>> input,
    Collector<Tuple2<String, String>> out) throws Exception {

  String sKey = (String)((Tuple1)key).f0;
  ...
}

如果定义 KeySelector<IN, KEY> 函数来提取 key ,则可以避免强制转换并使用正确的类型,因为 KEY 的返回类型 KeySelector 是编译器已知的。

关于apache-flink - Apache Flink : ProcessWindowFunction is not applicable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49377058/

10-12 04:29