我们正在使用spark 1.6
,并且正在尝试为类似事件保留全局身份。具有相同ID的事件“组”很少(在示例中为数字。仅出于唯一性而添加字母)。而且我们知道其中一些事件是相似的,因此我们能够将它们联系起来。我们想要保留以下内容:
Z -> 1, 2, 3
X -> 4
因此,将来如果发生一些ID为4的事件,我们可以将
X
分配为全局标识。请检查示例以获得更好的说明:
假设我们有一些流数据进入了 Spark 作业。
1a
1b
2c
2d
2e
3f
3g
3h
4i
由于事件1是我们的首次出现,因此我们想分配
1 to Z
。接下来我们知道1b和2c是相似的。所以我们想保留
2->1
映射。 2e和3f也是一样,因此我们需要映射3-2
。因此,目前我们有3对1->Z
,2->1
和3->2
。我们要创建“历史”路径:
Z <- 1 <- 2 <- 3
最后,我们将使用ID = Z
进行所有事件。1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X
我们试图使用
mapwithstate
,但是我们唯一能做的就是2->1
和3->2
。使用mapwithstate
,我们无法获得当前事件状态下的“父项”状态-例如。当前事件3与父级2,并且无法获取2 -> 1
和1 -> Z
。是否可以为此设置一些全局映射?我们已经尝试过累加器并进行广播,但是看起来不太合适。而且,我们无法使用
Z
替换第一次映射的事件1和第二次映射的事件2。如果新事件
5
到来,并且与3h类似,例如,我们需要再次分配映射5->Z
。 最佳答案
接下来是对给定问题的解决方案,它使用对“状态” RDD的可变引用,我们每次都会用新的结果进行更新。
我们使用transform
通过执行相似性联接来用唯一的全局ID标记传入事件流。这是“手工”联接,在此联接中,我们使用两个数据集的乘积,并逐对比较每个条目。
请注意,这是一个昂贵的过程。根据预期流的特定特性,可以更改许多部分。例如,我们可以用本地map
替换全局状态RDD并应用map-side
联接以实现更快的相似性联接,但是这在很大程度上取决于唯一ID集合的预期基数。
这比我最初预期的要复杂。仅将其作为迈向更强大解决方案的起点。例如,在状态RDD上的union
操作需要定期检查点,以避免DAG超出控制范围。
(有很大的改进空间-但这超出了提供答案的合理努力。)
在这里,我概述了解决方案的核心,有关完整的测试笔记本,请参见UniqueGlobalStateChains.snb。
// this mutable reference points to the `states` that we keep across interations
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD
// we assume an incoming Event stream. Here we prepare it for the global id-process
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()
// this is the core of the solution.
// We transform the incoming events into tagged events.
// As a by-product, the mutable `states` reference will get updated with the latest state mapping.
// the "chain" of events can be reconstructed ordering the states by timestamp
@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) =>
val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}
val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}
val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events
val similarityJoinMap = newEventIds.cartesian(currentMappings)
.collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
.collectAsMap
//val similarityBC = sparkContext.broadcast(similarityJoinMap)
val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids
val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) =>
events.map(event => (event.id,event.payload, globalKey))
}
val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
currentState = newStates
states.unpersist(false)
states = newStates.union(states)
states.cache()
newTaggedEvents
}
给定此输入序列:
"1|a,1|b,3|c", "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"
我们得到:
具有全局ID的已标记事件:
---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819
我们可以重建从全局id派生的事件链:
gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6
-o-
关于java - 为不同事件建立状态链,并在spark中分配全局ID,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45191359/