问题描述
如果我设置秒(1)
在的StreamingContext
批处理时间,像这样的:
If I set Seconds(1)
for the batch time in StreamingContext
, like this:
val ssc = new StreamingContext(sc, Seconds(1))
3秒接收数据的3秒,但我只需要数据的第一秒,我可以丢弃在未来2秒内的数据。所以,我可以花3秒钟时间来处理数据,只有第一第二?
3 seconds will receive the 3 seconds of data, but I only need the first seconds of data, I can discard the next 2 seconds of data. So can I spend 3 seconds to process only first second of data?
推荐答案
如果您跟踪柜台,比如像下面您可以通过 updateStateByKey
做到这一点:
You can do this via updateStateByKey
if you keep track of counter, for example like below:
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamEveryThirdApp {
def main(args: Array[String]) {
val sc = new SparkContext("local[*]", "Streaming Test")
implicit val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("./checkpoint")
// generate stream
val inputDStream = createConstantStream
// increase seconds counter
val accStream = inputDStream.updateStateByKey(updateState)
// keep only 1st second records
val firstOfThree = accStream.filter { case (key, (value, counter)) => counter == 1}
firstOfThree.print()
ssc.start()
ssc.awaitTermination()
}
def updateState: (Seq[Int], Option[(Option[Int], Int)]) => Option[(Option[Int], Int)] = {
case(values, state) =>
state match {
// If no previous state, i.e. set first Second
case None => Some(Some(values.sum), 1)
// If this is 3rd second - remove state
case Some((prevValue, 3)) => None
// If this is not the first second - increase seconds counter, but don't calculate values
case Some((prevValue, counter)) => Some((None, counter + 1))
}
}
def createConstantStream(implicit ssc: StreamingContext): ConstantInputDStream[(String, Int)] = {
val seq = Seq(
("key1", 1),
("key2", 3),
("key1", 2),
("key1", 2)
)
val rdd = ssc.sparkContext.parallelize(seq)
val inputDStream = new ConstantInputDStream(ssc, rdd)
inputDStream
}
}
在情况下,如果你有你的数据中的时间信息,你也可以使用3秒窗口 stream.window(秒(3),秒(3))
和过滤器从数据中的时间信息的记录,而且经常这是preferred办法
In case if you have time information within your data, you could also use 3 seconds window stream.window(Seconds(3), Seconds(3))
and filter records by the time information from data, and quite often this is preferred approach
这篇关于如何处理输入记录在一批次的子集,即,在3秒间歇时间的第一秒?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!