本文介绍了Flink模式流没有打印任何内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面有以下代码:

import java.util.Properties

import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema

object WindowedWordCount {
  val configFactory = ConfigFactory.load()
  def main(args: Array[String]) = {
    val brokers = configFactory.getString("kafka.broker")
    val topicChannel1 = configFactory.getString("kafka.topic1")

    val props = new Properties()
    ...

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

    val partitionedInput = dataStream.keyBy(jsonString => {
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account")
    })

    val priceCheck = Pattern.begin[String]("start").where({jsonString =>
      val jsonParser = new JsonParser()
      val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
      jsonObject.get("account").toString == "iOS"})

    val pattern = CEP.pattern(partitionedInput, priceCheck)

    val newStream = pattern.select(x =>
      x.get("start").map({str =>
        str
      })
    )

    newStream.print()

    env.execute()
  }
}

由于某些原因,上述代码在newStream.print()处未打印任何内容.我肯定Kafka中的数据与我上面定义的模式匹配,但是由于某种原因没有打印出任何数据.

For some reason in the above code at the newStream.print() nothing is being printed out. I am positive that there is data in Kafka that matches the pattern that I defined above but for some reason nothing is being printed out.

任何人都可以帮助我找到此代码中的错误吗?

Can anyone please help me spot an error in this code?

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {

  override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
    val jsonParser = new JsonParser()
    val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
    Instant.parse(context.get("serverTimestamp").toString.replaceAll("\"", "")).toEpochMilli
  }

  override def getCurrentWatermark(): Watermark = {
    new Watermark(System.currentTimeMillis())
  }
}

我在flink文档上看到,您可以在extractTimestamp方法中返回prevElementTimestamp(如果使用的是Kafka010),而在getCurrentWatermark方法中返回new Watermark(System.currentTimeMillis).

I saw on the flink doc that you can just return prevElementTimestamp in the extractTimestamp method (if you are using Kafka010) and new Watermark(System.currentTimeMillis) in the getCurrentWatermark method.

但是我不明白什么是prevElementTimestamp,或者为什么人们会返回new Watermark(System.currentTimeMillis)作为WaterMark而不是其他东西.您能否详细说明为什么我们要使用WaterMarkEventTime一起工作吗?

But I don't understand what prevElementTimestamp is or why one would return new Watermark(System.currentTimeMillis) as the WaterMark and not something else. Can you please elaborate on why we do this on how WaterMark and EventTime work together please?

推荐答案

您确实设置了要在EventTime中工作的工作,但没有提供时间戳和水印提取器.

You do setup your job to work in EventTime, but you do not provide timestamp and watermark extractor.

有关在事件时间工作的更多信息,请参见文档.如果要使用kafka嵌入式时间戳,请文档可能会对您有所帮助.

For more on working in event time see those docs. If you want to use the kafka embedded timestamps this docs may help you.

EventTime中,CEP库在水印到达时缓冲事件,以便正确处理乱序事件.在您的情况下,不会生成水印,因此事件将被无限缓冲.

In EventTime the CEP library buffers events upon watermark arrival so to properly handle out-of-order events. In your case there are no watermarks generated, so the events are buffered infinitly.

  1. 对于prevElementTimestamp,我认为文档非常清楚:

  1. For the prevElementTimestamp I think the docs are pretty clear:

由于Kafka 0.10.x,Kafka消息可以嵌入时间戳.

Since Kafka 0.10.x Kafka messages can have embedded timestamp.

在这种情况下,将Watermark生成为new Watermark(System.currentTimeMillis)并不是一个好主意.您应该基于对数据的了解来创建Watermark.有关WatermarkEventTime如何协同工作的信息,我比文档

Generating Watermark as new Watermark(System.currentTimeMillis) in this case is not a good idea. You should create Watermark based on your knowledge of the data. For information on how Watermark and EventTime work together I could not be more clear than the docs

这篇关于Flink模式流没有打印任何内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-13 02:12