(I have little knowledge about batch spark, but none on spark streaming)
我有一个 kafka 主题 Kafka[(A,B)->X]
其中 (A,B)
是键(A 和 B 是简单的数字类型),X 是消息类型,比较大(几 Mb).抛开输入失败的问题,数据是一个网格:对于A
用于 B
中的所有 b
.此外,b 是有序的,我认为我们可以假设一个 a
的所有消息都将按照 b 的顺序到达(我知道主题是按这个顺序填充的).
I have a kafka topics Kafka[(A,B)->X]
where (A,B)
is the key (A and B are simple numeric types) and X is the message type, relatively big (couple of Mb). Putting aside the problem of failure in input, the data is a grid: for each a
in A
, there will be messages (a,b)
for all b
in B
. Moreover, the b's are ordered and I think that we can assume that all messages for one a
will arrive following the b's order (what I know is that the topic is filled in this order).
Then I need to process the messages as follow:
- 对每条消息应用一个(一对)函数
- 应该对消息应用一个函数
whereaB = {(a,b) for all b in B}
- a (couple of) function is applied on each message
, outputting(a,b)->y
- a function should be applied on the messages
whereaB = {(a,b) for all b in B}
(and later there is a pass where messages need to be "transposed" to be processed across all a's, but that's not the question here)
如何实现从第 1 步到第 2 步的消息合并?
How can I achieved such a merge of messages, from step 1 to step 2?
它看起来像是子键 a
上的 groupby,但据我所知,groupby 方法将应用于每个微批次.我需要的是,对于每个 a
,等待所有 b
都被接收到(假设一个简单的计数系统可以工作).再次抛开缺少的 b 和输入数据中的错误.
It looks like a groupby over the sub-key a
, but to my understanding the method groupby would be applied per micro-batch. What I need is, for each a
, to wait that all b
's are received (assume a simple counting system would work). Once again putting aside missing b and error in input data.
在不知情的情况下,我会尝试查看是否可以通过附加到 hdfs 文件来实现这种合并,每个 a 一个.并尝试在这些文件满后触发第二个流进程.IE.当它包含所有 b 时,将文件移动到第 2 步的输入目录.但是:
Without knowledge, I would try to see if such merging could be achieved by appending to a hdfs file, one for each a. And try to trigger a second stream process on those files once full. I.e. when it contains all b, move the file to an input directory for step 2. But:
- 我不知道这样的附加是否可以在 hdfs 上实现
- 两个 sparkStreamingContext 需要并行运行,每个步骤一个.这看起来是个问题(?).
- 我知道通过 hdfs 会破坏 spark(流式传输)的恰好一次"属性
可以创建一个master RDD,通过RDD.union
将流产生的micro RDDs合并到master上.类似的东西:
You can create a master RDD, and merge the micro RDDs generated by the stream to the master with RDD.union
. Something like:
var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD // guessing on RDD type
myStream.foreachRDD(rdd => {
if (! rdd.isEmpty) {
您应该花一些时间阅读 也用于检查点,特别是:
You should take some time and read up on checkpointing as well, specifically:
数据检查点 - 将生成的 RDD 保存到可靠的存储中.这在一些结合数据的有状态转换中是必要的跨多个批次.在这样的转换中,生成的 RDD依赖于之前批次的 RDD,这会导致依赖链随着时间不断增加.为了避免这种无界恢复时间增加(与依赖链成正比),有状态转换的中间 RDD 是周期性的检查点到可靠的存储(例如 HDFS)以切断依赖链.
