问题描述
我有(Key,Value)类型的dstream.
I have dstream of (Key,Value) type.
mapped2.foreachRDD(rdd => {
rdd.foreachPartition(p => {
p.foreach(x => {
}
)})
})
我需要确保所有具有相同键的项都在一个分区中并由一个内核处理.因此实际上是按顺序处理的.
I need to get assured that all items with identical keys are processed in one partition and by one core..so actually there are processed sequentially..
如何执行此操作?我可以使用效率低下的GroupBykey吗?
How to do this? Can I use GroupBykey which is inefficient?
推荐答案
您可以使用 PairDStreamFunctions.combineByKey
:
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.dstream.DStream
/**
* Created by Yuval.Itzchakov on 29/11/2016.
*/
object GroupingDStream {
def main(args: Array[String]): Unit = {
val pairs: DStream[(String, String)] = ???
val numberOfPartitions: Int = ???
val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
_ => List[String](),
(strings: List[String], s: String) => s +: strings,
(first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions))
groupedByIds.foreachRDD(rdd => {
rdd.foreach((kvp: (String, List[String])) => {
})
})
}
}
combineByKey
的结果将是一个元组,第一个元素是键,第二个元素是值的集合.注意,为了简化示例,我使用了(String,String)
,因为您没有提供任何类型.
The result of combineByKey
would be a tuple with the first element being the key and the second element being a collection of the values. Note I used (String, String)
for the sake of the simplicity of the example, as you haven't provided any types.
然后,使用 foreach
迭代值列表,并在需要时顺序处理它们.请注意,如果需要应用其他转换,则可以使用 DStream.map
并在第二个元素(值列表)上进行操作,而不是使用 foreachRDD
.
Then, using foreach
to iterate the list of values and process them sequentially if you need. Note that if you need to apply additional transformations, you can use DStream.map
and operate on the second element (list of values) instead of using foreachRDD
.
这篇关于DStream所有相同的密钥应按顺序处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!