问题描述
我在下面有以下代码:
import java.util.Properties
import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val brokers = configFactory.getString("kafka.broker")
val topicChannel1 = configFactory.getString("kafka.topic1")
val props = new Properties()
...
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val priceCheck = Pattern.begin[String]("start").where({jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account").toString == "iOS"})
val pattern = CEP.pattern(partitionedInput, priceCheck)
val newStream = pattern.select(x =>
x.get("start").map({str =>
str
})
)
newStream.print()
env.execute()
}
}
由于某些原因,上述代码在newStream.print()
处未打印任何内容.我肯定Kafka中的数据与我上面定义的模式匹配,但是由于某种原因没有打印出任何数据.
For some reason in the above code at the newStream.print()
nothing is being printed out. I am positive that there is data in Kafka that matches the pattern that I defined above but for some reason nothing is being printed out.
任何人都可以帮助我找到此代码中的错误吗?
Can anyone please help me spot an error in this code?
class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
val jsonParser = new JsonParser()
val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis())
}
}
我在flink文档上看到,您可以在extractTimestamp
方法中返回prevElementTimestamp
(如果使用的是Kafka010),而在getCurrentWatermark
方法中返回new Watermark(System.currentTimeMillis)
.
I saw on the flink doc that you can just return prevElementTimestamp
in the extractTimestamp
method (if you are using Kafka010) and new Watermark(System.currentTimeMillis)
in the getCurrentWatermark
method.
但是我不明白什么是prevElementTimestamp
,或者为什么人们会返回new Watermark(System.currentTimeMillis)
作为WaterMark而不是其他东西.您能否详细说明为什么我们要使用WaterMark
和EventTime
一起工作吗?
But I don't understand what prevElementTimestamp
is or why one would return new Watermark(System.currentTimeMillis)
as the WaterMark and not something else. Can you please elaborate on why we do this on how WaterMark
and EventTime
work together please?
推荐答案
您确实设置了要在EventTime
中工作的工作,但没有提供时间戳和水印提取器.
You do setup your job to work in EventTime
, but you do not provide timestamp and watermark extractor.
有关在事件时间工作的更多信息,请参见文档.如果要使用kafka嵌入式时间戳,请文档可能会对您有所帮助.
For more on working in event time see those docs. If you want to use the kafka embedded timestamps this docs may help you.
在EventTime
中,CEP库在水印到达时缓冲事件,以便正确处理乱序事件.在您的情况下,不会生成水印,因此事件将被无限缓冲.
In EventTime
the CEP library buffers events upon watermark arrival so to properly handle out-of-order events. In your case there are no watermarks generated, so the events are buffered infinitly.
-
对于
prevElementTimestamp
,我认为文档非常清楚:
For the
prevElementTimestamp
I think the docs are pretty clear:
由于Kafka 0.10.x,Kafka消息可以嵌入时间戳.
Since Kafka 0.10.x Kafka messages can have embedded timestamp.
在这种情况下,将Watermark
生成为new Watermark(System.currentTimeMillis)
并不是一个好主意.您应该基于对数据的了解来创建Watermark
.有关Watermark
和EventTime
如何协同工作的信息,我比文档
Generating Watermark
as new Watermark(System.currentTimeMillis)
in this case is not a good idea. You should create Watermark
based on your knowledge of the data. For information on how Watermark
and EventTime
work together I could not be more clear than the docs
这篇关于Flink模式流没有打印任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!