问题描述
好吧大师的,我有这并不多大意义,我的一个问题。
我坚持努力的对象保存到MongoDB的,看起来像这样(大约)
Ok guru's, i've got a problem that doesnt make much sense to me.I am stuck trying to save an object to mongodb that looks like such (roughly)
{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}]
football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}]
]
timeInterval:"last minute to this minute" ( i'm doing timeseries data)
terms:["football","baseball"]
}
见下面哪个环路IM卡上。注意:这个问题可能有一些做的
RRD到期。我试图通过在内存中坚持它来修复它,但我不知道该怎么做。
see below on which loop im stuck on. note the issue might have something to do with the rrd expiring. I tried to fix it by persisting it in memory but i'm not sure what to do.
twitterStream.foreachRDD(rrd => {
val entryToSave = MongoDBObject()
val termWithTweets = MongoDBObject()
rrd.persist()
filters.foreach(term =>{
var listOfTweets = MongoDBObject()
rrd.persist()
for(status <- rrd){
if(status.getText.contains(term)) {
// listOfTweets += status
//Why doesnt this line below actually add the key value pair to the variable
//defined outside of the "for(status <- rrd)" loop? I know ( through debugging)
//that it does in fact append inside the loop.
listOfTweets += (DateTime.now.toString() -> status.toString)
}
}
//when I print the listOfTweets outside of the for loop it is empty, Why?
println("outsideRRD",listOfTweets)
termWithTweets += (term -> listOfTweets)
})
entryToSave += ("data" -> termWithTweets)
entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString)
entryToSave += ("terms" -> filters)
collection.insert(entryToSave)
})
我不认为这是一个VAL / VAR问题,虽然可能。我已经试过了
左右逢源
I dont think this is a val/var issue, although it may be. I've tried itboth ways
推荐答案
在RDD的,计算被分布在集群上。你不能更新在该RDD关闭操作之外创建从RDD中的一个变量。他们基本上是在两个不同的地方:该变量在星火驱动程序创建,并在工作人员访问,应被视为为只读
The computations on RDD's are distributed over the cluster. You can't update a variable that was created outside the RDD operation closure from within the RDD. They are basically in two different places: The variable is created in the Spark driver and accessed in the workers and should be treated as read-only.
火花支持,可能在这种情况下使用分布式cummulators:
Spark supports distributed cummulators that could be used in this case:Spark Cummulators
另一种选择(一个我preFER)是RDD流转换为所需要的数据格式和使用 foreachRDD
方法,保留它到二级存储。这将是处理这个问题的更有效方式。这将大致是这样的:
Another option (the one I'd prefer) is to transform the stream of RDD into the desired data format and the use the foreachRDD
method to persist it into secondary storage. This would be a more functional way to approach the problem. It would roughly look like this:
val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)
这篇关于MongoDBObject没有被添加到RRD foreach循环卡斯巴斯卡拉apache的火花里的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!