我正在以更新方式运行我的Spark结构化流式作业,无法弄清楚是否有可能获得每次更新的批次ID。例如,当您以更新方式输出到控制台时,Spark将在输出时显示每个批号:
-------------------------------------------
Batch: 0
-------------------------------------------
...
-------------------------------------------
Batch: 1
-------------------------------------------
...
等等。
我需要在发送给Kafka的每封邮件中添加相同的信息。为此,我仅限于Spark 2.3,因此无法使用forEachBatch。
我的工作输出一组特定维度的汇总指标。自上次触发以来,每个触发器的指标都可能已更新-具有更新指标的维度将在下一批中输出,因为我正在更新模式下运行。当我将这些输出到Kafka时,我需要知道哪个批次是最新的-因此需要批次号。我认为forEachBatch可以满足我的需求,但是很遗憾,我无法访问Spark 2.4。我可以使用forEach完成此操作吗?我只能使用更新模式,因为很可能会出现后期事件并更新以前已经输出的指标。
这是我用来测试的控制台模式。此输出分别显示每个批次,以及哪个编号:
StreamingQuery query = logs.writeStream()
.format("console")
.outputMode(OutputMode.Update())
.start();
我想做这样的事情
StreamingQuery query = agg.WriteStream()
.format("kafka")
.outputMode(OutputMode.Update())
.option("kafka.bootstrap.servers", "myconnection")
.Option("topic", "mytopic")
.Start();
但仍保留在mytopic中分辨消息来自哪个批次的功能。这可能吗?
最佳答案
我认为您可以使用long version
中的版本号ForeachWriter
您可以像这样实现自己的KafkaCustomSink。
class KafkaCustomSink(val config: Config) extends ForeachWriter[String] {
var producer: KafkaProducer[String, String] = _
var _version: Long = _
override def open(partitionId: Long, version: Long): Boolean = {
_version = version
val props = new Properties()
props.put("bootstrap.servers", config(Constant.OUTPUT_BOOTSTRAP_SERVER))
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "0")
producer = new KafkaProducer[String, String](props)
true
}
override def process(value: String): Unit = {
//use version here
val record = new ProducerRecord[String, String](config(Constant.OUTPUT_TOPIC), null, "version : %s, data : %s".format(_version, value))
producer.send(record)
}
override def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
并分配给
logs
.writeStream
.outputMode("update")
.foreach(new KafkaCustomSink(config))
.trigger(Trigger.ProcessingTime(config(Constant.TRIGGER_INTERVAL).toInt, TimeUnit.SECONDS))
.option("checkpointLocation", config(Constant.CHECKPOINT_LOCATION))