




I have dstream of (Key,Value) type.

mapped2.foreachRDD(rdd => {
  rdd.foreachPartition(p => {
    p.foreach(x => {


I need to get assured that all items with identical keys are processed in one partition and by one core..so actually there are processed sequentially..


How to do this? Can I use GroupBykey which is inefficient?


您可以使用 PairDStreamFunctions.combineByKey :

import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.dstream.DStream
  * Created by Yuval.Itzchakov on 29/11/2016.
object GroupingDStream {
  def main(args: Array[String]): Unit = {
    val pairs: DStream[(String, String)] = ???
    val numberOfPartitions: Int = ???

    val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
      _ => List[String](),
      (strings: List[String], s: String) => s +: strings,
      (first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions))

    groupedByIds.foreachRDD(rdd => {
      rdd.foreach((kvp: (String, List[String])) => {


combineByKey 的结果将是一个元组,第一个元素是键,第二个元素是值的集合.注意,为了简化示例,我使用了(String,String),因为您没有提供任何类型.

The result of combineByKey would be a tuple with the first element being the key and the second element being a collection of the values. Note I used (String, String) for the sake of the simplicity of the example, as you haven't provided any types.

然后,使用 foreach 迭代值列表,并在需要时顺序处理它们.请注意,如果需要应用其他转换,则可以使用 DStream.map 并在第二个元素(值列表)上进行操作,而不是使用 foreachRDD .

Then, using foreach to iterate the list of values and process them sequentially if you need. Note that if you need to apply additional transformations, you can use DStream.map and operate on the second element (list of values) instead of using foreachRDD.


08-20 13:48