本文介绍了在Spark Streaming中合并微批的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

(我对批量火花几乎一无所知,但没有火花流)



问题



I有一个kafka主题 Kafka [(A,B) - > X] 其中(A,B)是键(A和B是简单的数字类型),X是消息类型,相对较大(Mb的几个)。抛开输入失败的问题,数据是一个网格:对于 A 中的每个 a ,都会有消息(a,b)对于所有 b B 。此外,b是有序的,我认为我们可以假设一个 a 的所有消息都会按照b的顺序到达(我知道的是这个主题是填充在这个订单)。



然后我需要处理以下消息:


  1. 在每个消息(a,b) - > x 上应用一个(几个)函数,输出(a,b) - > y

  2. 函数应该应用于 aB-> Seq [y] 其中<$对于B中的所有b而言,c $ c> aB = {(a,b)

后来有一个通道,消息需要转置以便在所有a上进行处理,但这不是问题)

问题



从步骤1到步骤2,我怎样才能实现这样的消息合并?

它看起来像groupby在子键 a 上,但根据我的理解,方法groupby会被应用每微量批次。我需要的是,对于每个 a ,等待接收到所有 b 的(假设一个简单的计数系统会工作)。再次在输入数据中丢失缺失b和错误。



一些想法 hdfs文件,每个文件一个。尝试触发这些文件的第二个流程,一旦完成。即当它包含所有b时,将文件移动到第2步的输入目录。但是:


  1. 我不知道这样的追加是否可以在hdfs上实现

  2. 两个sparkStreamingContext需要并行运行,每个步骤一个。这看起来是一个问题(?)。

  3. 我明白,通过hdfs会打破spark(streaming)的精确的一次属性


解决方案

您可以创建主RDD,并将流生成的微RDD与 RDD.union 。例如:

  var masterRDD:RDD [(Long,Long),String] = sc.emptyRDD //猜测RDD类型

myStream.foreachRDD(rdd => {
if(!rdd.isEmpty){
masterRDD.union(rdd)

masterRDD.groupBy (......).....
}
})

您应该花一些时间并 特别是:


(I have little knowledge about batch spark, but none on spark streaming)

Problem

I have a kafka topics Kafka[(A,B)->X] where (A,B) is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a in A, there will be messages (a,b) for all b in B. Moreover, the b's are ordered and I think that we can assume that all messages for one a will arrive following the b's order (what I know is that the topic is filled in this order).

Then I need to process the messages as follow:

  1. a (couple of) function is applied on each message (a,b)->x, outputting (a,b)->y
  2. a function should be applied on the messages aB->Seq[y] where aB = {(a,b) for all b in B}

(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)

Question

How can I achieved such a merge of messages, from step 1 to step 2?

It looks like a groupby over the sub-key a, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a, to wait that all b's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.

Some idea

Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:

  1. I don't know if such appending can be implemented on hdfs
  2. Two sparkStreamingContext would need to run in parallel, one for each step. And that looks to be a problem (?).
  3. I understood that passing by hdfs would break the "exactly once" property of spark (streaming)
解决方案

You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union. Something like:

var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD  // guessing on RDD type

myStream.foreachRDD(rdd => {
  if (! rdd.isEmpty) {
    masterRDD.union(rdd)

    masterRDD.groupBy(...).....
  }
})

You should take some time and read up on checkpointing as well, specifically:

这篇关于在Spark Streaming中合并微批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:49
查看更多