问题描述
我遵循以下代码:-
val rdd = sc.cassandraTable("db", "table").select("id", "date", "gpsdt").where("id=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2) , entry(3))
val rddcopy = rdd.sortBy(row => row.get[String]("gpsdt"), false).zipWithIndex()
rddcopy.foreach { records =>
{
val previousRow = (records - 1)th row
val currentRow = records
// Some calculation based on both rows
}
}
因此,Idea是在RDD的每次迭代中仅获得上一个\下一行.我想根据上一行的当前值来计算当前行的某些字段.谢谢,
So, Idea is to get just previous \ next row on each iteration of RDD. I want to calculate some field on current row based on the value present on previous row. Thanks,
推荐答案
编辑II:以下被误解的问题是如何获取滚动窗口语义,但需要滑动窗口.考虑到这是一个排序的RDD
EDIT II: Misunderstood question below is how to get tumbling window semantics but sliding window is needed. considering this is a sorted RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
sortedRDD.sliding(2)
应该可以解决问题.但是请注意,这使用的是DeveloperAPI.
should do the trick. Note however that this is using a DeveloperAPI.
或者您可以
val l = sortedRdd.zipWithIndex.map(kv => (kv._2, kv._1))
val r = sortedRdd.zipWithIndex.map(kv => (kv._2-1, kv._1))
val sliding = l.join(r)
rdd联接应该是内部联接(IIRC),因此删除了元组将部分为空的边缘情况
rdd joins should be inner joins (IIRC) thus dropping the edge cases where the tuples would be partially null
旧东西:
您如何确定上一行?RDD本身没有任何稳定的排序.如果您有递增的密集键,则可以添加一个新列,该列的计算方式如下: if(k%2 == 0)k/2 else(k-1)/2
对于两个连续键具有相同值的键.然后,您可以分组.
how do you do identify the previous row? RDDs do not have any sort of stable ordering by themselves. if you have an incrementing dense key you could add a new column that get's calculated the following way if (k % 2 == 0) k / 2 else (k-1)/2
this should give you a key that has the same value for two successive keys. Then you could just group by.
但是要重申的是,在大多数情况下,对于RDD(取决于分区,数据源等),并没有真正意义上的 previous
But to reiterate there is no really sensible notion of previous
in most cases for RDDs (depending on partitioning, datasource etc.)
因此,现在您有了一个zipWithIndex并在您的集合中进行了排序,您可以执行我上面提到的操作.因此,现在您有了 RDD [(Int,YourData)]
并可以完成
so now that you have a zipWithIndex and an ordering in your set you can do what I mentioned above. So now you have an RDD[(Int, YourData)]
and can do
rdd.map( kv => if (kv._1 % 2 == 0) (kv._1 / 2, kv._2) else ( (kv._1 -1) /2, kv._2 ) ).groupByKey.foreach (/* your stuff here /*)
如果您在任何时候减少,请考虑使用 reduceByKey
而不是 groupByKey().reduce
if you reduce at any point consider using reduceByKey
rather than groupByKey().reduce
这篇关于Spark在RDD的每次迭代中找到先前的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!