问题描述
我一直试图找到一个连接器,以将数据从Redis读取到Flink. Flink的文档包含要写入Redis的连接器的描述.我需要在Flink作业中从Redis读取数据.在使用Apache Flink进行数据流中,Fabian提到可以从Redis读取数据.可用于此目的的连接器是什么?
I have been trying to find a connector to read data from Redis to Flink. Flink's documentation contains the description for a connector to write to Redis. I need to read data from Redis in my Flink job. In Using Apache Flink for data streaming, Fabian has mentioned that it is possible to read data from Redis. What is the connector that can be used for the purpose?
推荐答案
我们正在生产一个大致像这样的产品
We are running one in production that looks roughly like this
class RedisSource extends RichSourceFunction[SomeDataType] {
var client: RedisClient = _
override def open(parameters: Configuration) = {
client = RedisClient() // init connection etc
}
@volatile var isRunning = true
override def cancel(): Unit = {
isRunning = false
client.close()
}
override def run(ctx: SourceContext[SomeDataType]): Unit = while (isRunning) {
for {
data <- ??? // get some data from the redis client
} yield ctx.collect(SomeDataType(data))
}
}
我认为这实际上取决于您需要从Redis获取什么.上面的代码可用于从列表/队列中获取消息,进行转换/推送,然后将其从队列中删除.Redis还支持Pub/Sub,因此可以进行订阅,获取SourceConext并将消息推送到下游.
I think it really depends on what you need to fetch from redis. The above could be used to fetch a message from a list/queue, transform/push and then delete it form the queue.Redis also supports Pub/Sub, so it would possible to subscribe, grab the SourceConext and push messages downstream.
这篇关于从Redis读取数据到Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!