


val rdd = sc.cassandraTable("db", "table").select("id", "date", "gpsdt").where("id=? and date=? and gpsdt>? and gpsdt<?", entry(0), entry(1), entry(2) , entry(3))
val rddcopy = rdd.sortBy(row => row.get[String]("gpsdt"), false).zipWithIndex()
rddcopy.foreach { records =>
    val previousRow = (records - 1)th row
    val currentRow = records
// Some calculation based on both rows 


So, Idea is to get just previous \ next row on each iteration of RDD. I want to calculate some field on current row based on the value present on previous row. Thanks,



EDIT II: Misunderstood question below is how to get tumbling window semantics but sliding window is needed. considering this is a sorted RDD

import org.apache.spark.mllib.rdd.RDDFunctions._


should do the trick. Note however that this is using a DeveloperAPI.


val l = sortedRdd.zipWithIndex.map(kv => (kv._2, kv._1))
val r = sortedRdd.zipWithIndex.map(kv => (kv._2-1, kv._1))
val sliding = l.join(r)


rdd joins should be inner joins (IIRC) thus dropping the edge cases where the tuples would be partially null


how do you do identify the previous row? RDDs do not have any sort of stable ordering by themselves. if you have an incrementing dense key you could add a new column that get's calculated the following way if (k % 2 == 0) k / 2 else (k-1)/2 this should give you a key that has the same value for two successive keys. Then you could just group by.

But to reiterate there is no really sensible notion of previous in most cases for RDDs (depending on partitioning, datasource etc.)

so now that you have a zipWithIndex and an ordering in your set you can do what I mentioned above. So now you have an RDD[(Int, YourData)] and can do

rdd.map( kv => if (kv._1 % 2 == 0) (kv._1 / 2, kv._2) else ( (kv._1 -1) /2, kv._2 ) ).groupByKey.foreach (/* your stuff here /*)

if you reduce at any point consider using reduceByKey rather than groupByKey().reduce


