我们正在使用spark 1.6,并且正在尝试为类似事件保留全局身份。具有相同ID的事件“组”很少(在示例中为数字。仅出于唯一性而添加字母)。而且我们知道其中一些事件是相似的,因此我们能够将它们联系起来。我们想要保留以下内容:

Z -> 1, 2, 3
X -> 4

因此,将来如果发生一些ID为4的事件,我们可以将X分配为全局标识。

请检查示例以获得更好的说明:

假设我们有一些流数据进入了 Spark 作业。
1a
1b
2c
2d
2e
3f
3g
3h
4i

由于事件1是我们的首次出现,因此我们想分配1 to Z
接下来我们知道1b和2c是相似的。所以我们想保留2->1映射。 2e和3f也是一样,因此我们需要映射3-2。因此,目前我们有3对1->Z2->13->2

我们要创建“历史”路径:Z <- 1 <- 2 <- 3最后,我们将使用ID = Z进行所有事件。
1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> X

我们试图使用mapwithstate,但是我们唯一能做的就是2->13->2。使用mapwithstate,我们无法获得当前事件状态下的“父项”状态-例如。当前事件3与父级2,并且无法获取2 -> 11 -> Z

是否可以为此设置一些全局映射?我们已经尝试过累加器并进行广播,但是看起来不太合适。而且,我们无法使用Z替换第一次映射的事件1和第二次映射的事件2。

如果新事件5到来,并且与3h类似,例如,我们需要再次分配映射5->Z

最佳答案

接下来是对给定问题的解决方案,它使用对“状态” RDD的可变引用,我们每次都会用新的结果进行更新。

我们使用transform通过执行相似性联接来用唯一的全局ID标记传入事件流。这是“手工”联接,在此联接中,我们使用两个数据集的乘积,并逐对比较每个条目。

请注意,这是一个昂贵的过程。根据预期流的特定特性,可以更改许多部分。例如,我们可以用本地map替换全局状态RDD并应用map-side联接以实现更快的相似性联接,但是这在很大程度上取决于唯一ID集合的预期基数。

这比我最初预期的要复杂。仅将其作为迈向更强大解决方案的起点。例如,在状态RDD上的union操作需要定期检查点,以避免DAG超出控制范围。
(有很大的改进空间-但这超出了提供答案的合理努力。)

在这里,我概述了解决方案的核心,有关完整的测试笔记本,请参见UniqueGlobalStateChains.snb

// this mutable reference points to the `states` that we keep across interations
@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD

// we assume an incoming Event stream. Here we prepare it for the global id-process
@transient val eventsById = eventStream.map(event => (event.id, event))
@transient val groupedEvents = eventsById.groupByKey()

// this is the core of the solution.
// We transform the incoming events into tagged events.
// As a by-product, the mutable `states` reference will get updated with the latest state mapping.
// the "chain" of events can be reconstructed ordering the states by timestamp

@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) =>
    val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}}
    val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}

    val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events
    val similarityJoinMap = newEventIds.cartesian(currentMappings)
        .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}
        .collectAsMap
    //val similarityBC = sparkContext.broadcast(similarityJoinMap)
    val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))
    newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids

    val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) =>
                                      events.map(event => (event.id,event.payload, globalKey))
                                     }
    val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}
    currentState = newStates
    states.unpersist(false)
    states = newStates.union(states)
    states.cache()
    newTaggedEvents
    }

给定此输入序列:
"1|a,1|b,3|c",  "2|d,2|e,2|f", "3|g,3|h,3|i,4|j", "5|k", "4|f,1|g", "6|h"

我们得到:

具有全局ID的已标记事件:
---
1|a: gen-4180,1|b: gen-4180,3|c: gen-5819
---
2|d: gen-4180,2|e: gen-4180,2|f: gen-4180
---
3|g: gen-4180,3|h: gen-4180,3|i: gen-4180,4|j: gen-5819
---
5|k: gen-5819
---
1|g: gen-2635,4|f: gen-4180
---
6|h: gen-5819

我们可以重建从全局id派生的事件链:
gen-4180: 1<-2<-3<-4
gen-2635: 1
gen-5819: 3<-4<-5<-6

-o-

关于java - 为不同事件建立状态链,并在spark中分配全局ID,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/45191359/

10-11 22:32
查看更多