问题描述
我的目标是使用kafka读取json格式的字符串,对字符串进行过滤,然后将消息下沉(仍为json字符串格式).
My goal is to use kafka to read in a string in json format, do a filter to the string and then sink the message out (still in json string format).
出于测试目的,我的输入字符串消息如下:
For testing purpose, my input string message looks like:
{"a":1,"b":2}
我的实现代码是:
def main(args: Array[String]): Unit = {
// parse input arguments
val params = ParameterTool.fromArgs(args)
if (params.getNumberOfParameters < 4) {
println("Missing parameters!\n"
+ "Usage: Kafka --input-topic <topic> --output-topic <topic> "
+ "--bootstrap.servers <kafka brokers> "
+ "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
return
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONKeyValueDeserializationSchema(false),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer)
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
&& node.get("b").asText.equals("2"))
messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
params.getRequired("output-topic"),
new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
}, params.getProperties
))
env.execute("Kafka 0.10 Example")
}
可以看出,我想将消息流打印到控制台,并将过滤后的消息下沉到kafka.但是,我看不到它们.
As can be seen, I want to print message stream to the console and sink the filtered message to kafka. However, I can see neither of them.
有趣的是,如果我将KafkaConsumer的模式从JSONKeyValueDeserializationSchema修改为SimpleStringSchema,则可以看到messageStream打印到控制台.代码如下所示:
The interesting thing is, if I modify the schema of KafkaConsumer from JSONKeyValueDeserializationSchema to SimpleStringSchema, I can see messageStream print to the console. Code as shown below:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)
val messageStream = env.addSource(kafkaConsumer)
messageStream.print()
这让我想起如果我使用JSONKeyValueDeserializationSchema,我的输入消息实际上是不被Kafka接受的.但这似乎很奇怪,而且与在线文档完全不同( https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html )
This makes me think if I use JSONKeyValueDeserializationSchema, my input message is actually not accepted by Kafka. But this seems so weird and quite different from the online document(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)
希望有人可以帮助我!
推荐答案
JSONKeyValueDeserializationSchema()需要每个kafka msg的消息密钥,并且我假设在生成JSON消息时未提供任何密钥并通过kafka主题发送.
The JSONKeyValueDeserializationSchema() expects message key with each kafka msg and I am assuming that no key is supplied when the JSON messages are produced and sent over the kafka topic.
要解决此问题,请尝试使用 JSONDeserializationSchema(),该方法仅接收消息,并根据接收到的消息创建对象节点.
Thus to solve the issue, try using JSONDeserializationSchema() which expects only the message and creates an object node based on the message received.
这篇关于下沉kafka流时看不到消息,并且在flink 1.2中看不到打印消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!