本文介绍了Flink如何支持多个KeyBy的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在下面的代码示例中,我试图获取一系列员工记录 { Country, Employer, Name, Salary, Age }
并倾销每个国家/地区的最高薪员工.不幸的是,Multiple KEY By 不起作用.
In code sample below, I am trying to get a stream of employee records { Country, Employer, Name, Salary, Age }
and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.
只有 KeyBy(Employer) 在反映,因此我没有得到正确的结果.我错过了什么?
Only KeyBy(Employer) is reflecting, thus I don't get correct result.What am I missing?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Employee> streamEmployee = env.addSource(
new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
.map(new MapFunction<ObjectNode, Employee>() {
private static final long serialVersionUID = 6111226274068863916L;
@Override
public Employee map(ObjectNode value) throws Exception {
final Gson gson = new GsonBuilder().create();
Employee uMsg = gson.fromJson(value.toString(), Employee.class);
return uMsg;
}
});
KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
.keyBy(new KeySelector<Employee, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String getKey(Employee value) throws Exception {
// TODO Auto-generated method stub
return value.getCountry();
}
}).keyBy(new KeySelector<Employee, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String getKey(Employee value) throws Exception {
// TODO Auto-generated method stub
return value.getEmployer();
}
});
// This should display employees highly paid in a given country , for a
// given employer
DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
.maxBy("salary");
// Assume toString() is overridden , so print works well.
uHighlyPaidEmployee.print();
env.execute("Employee-employer log processor");
推荐答案
您可以定义返回复合键的 KeySelector
:
You can define a KeySelector
that returns a composite key:
KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer =
streamEmployee.keyBy(
new KeySelector<Employee, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(Employee value) throws Exception {
return Tuple2.of(value.getCountry(), value.getEmployer());
}
}
);
这篇关于Flink如何支持多个KeyBy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!