我的代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(500);
DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));
Jedis jedis = new Jedis("master1");
stream.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String result = jedis.hget("rtc", value);
return result;
}
});
我想从
map()
中的redis获取一些数据,但它无法运行,因为jedis.class不可序列化。如何在
map()
中使用不可序列化类,如zkclient、jedis? 最佳答案
像RichMapFunction
这样的所有富函数都有一个open(Configuration)
和close
调用,您可以覆盖它。这些生命周期方法在函数部署到执行它的taskmanager后调用。
class MyMapFunction extends RichMapFunction<String, String> {
private transient Jedis jedis;
@Override
public void open(Configuration parameters) {
// open connection to Redis, for example
jedis = new Jedis("master1");
}
@Override
public void close() {
// close connection to Redis
jedis.close();
}
}
关于redis - 如何在flink map()中使用Jedis,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48151728/