问题描述
(我对批量火花几乎一无所知,但没有火花流)
问题
I有一个kafka主题 Kafka [(A,B) - > X]
其中(A,B)
是键(A和B是简单的数字类型),X是消息类型,相对较大(Mb的几个)。抛开输入失败的问题,数据是一个网格:对于 A
中的每个 a
,都会有消息(a,b)
对于所有 b
在 B
。此外,b是有序的,我认为我们可以假设一个 a
的所有消息都会按照b的顺序到达(我知道的是这个主题是填充在这个订单)。
然后我需要处理以下消息:
- 在每个消息
(a,b) - > x
上应用一个(几个)函数,输出(a,b) - > y
- 函数应该应用于
aB-> Seq [y]
其中<$对于B中的所有b而言,c $ c> aB = {(a,b)
后来有一个通道,消息需要转置以便在所有a上进行处理,但这不是问题)
问题
从步骤1到步骤2,我怎样才能实现这样的消息合并?
它看起来像groupby在子键 a
上,但根据我的理解,方法groupby会被应用每微量批次。我需要的是,对于每个 a
,等待接收到所有 b
的(假设一个简单的计数系统会工作)。再次在输入数据中丢失缺失b和错误。
一些想法 hdfs文件,每个文件一个。尝试触发这些文件的第二个流程,一旦完成。即当它包含所有b时,将文件移动到第2步的输入目录。但是:
- 我不知道这样的追加是否可以在hdfs上实现
- 两个sparkStreamingContext需要并行运行,每个步骤一个。这看起来是一个问题(?)。
- 我明白,通过hdfs会打破spark(streaming)的精确的一次属性
您可以创建主RDD,并将流生成的微RDD与 RDD.union
。例如:
var masterRDD:RDD [(Long,Long),String] = sc.emptyRDD //猜测RDD类型
myStream.foreachRDD(rdd => {
if(!rdd.isEmpty){
masterRDD.union(rdd)
masterRDD.groupBy (......).....
}
})
您应该花一些时间并 特别是:
(I have little knowledge about batch spark, but none on spark streaming)
Problem
I have a kafka topics Kafka[(A,B)->X]
where (A,B)
is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a
in A
, there will be messages (a,b)
for all b
in B
. Moreover, the b's are ordered and I think that we can assume that all messages for one a
will arrive following the b's order (what I know is that the topic is filled in this order).
Then I need to process the messages as follow:
- a (couple of) function is applied on each message
(a,b)->x
, outputting(a,b)->y
- a function should be applied on the messages
aB->Seq[y]
whereaB = {(a,b) for all b in B}
(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)
Question
How can I achieved such a merge of messages, from step 1 to step 2?
It looks like a groupby over the sub-key a
, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a
, to wait that all b
's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.
Some idea
Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:
- I don't know if such appending can be implemented on hdfs
- Two sparkStreamingContext would need to run in parallel, one for each step. And that looks to be a problem (?).
- I understood that passing by hdfs would break the "exactly once" property of spark (streaming)
You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union
. Something like:
var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD // guessing on RDD type
myStream.foreachRDD(rdd => {
if (! rdd.isEmpty) {
masterRDD.union(rdd)
masterRDD.groupBy(...).....
}
})
You should take some time and read up on checkpointing as well, specifically:
这篇关于在Spark Streaming中合并微批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!