我想在我的 Apache Flink 项目中使用 ProcessWindowFunction
。但是我在使用过程函数时遇到一些错误,请参阅下面的代码片段
错误是:
我的程序:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
我的
ProcessWindowFunction
:private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
最佳答案
问题可能是 ProcessWindowFunction
的泛型类型。
您正在按位置 ( keyBy(0)
) 引用键。因此,编译器无法推断其类型 ( String
),您需要将 ProcessWindowFunction
更改为:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, Tuple, Window>
通过将
String
替换为 Tuple
,您现在拥有一个通用的键占位符,当您需要访问 Tuple1<String>
方法中的键时,您可以将其转换为 processElement()
:public void process(
Tuple key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception {
String sKey = (String)((Tuple1)key).f0;
...
}
如果定义
KeySelector<IN, KEY>
函数来提取 key ,则可以避免强制转换并使用正确的类型,因为 KEY
的返回类型 KeySelector
是编译器已知的。关于apache-flink - Apache Flink : ProcessWindowFunction is not applicable,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49377058/