由于从Cassandra查询数据存在限制,因此我尝试使用Spark逐批读取数据并将其存储在RDD中。
然后,我使用并集函数添加所有的RDD。
这是我的代码。
private void getDataFromCassandra(JavaSparkContext sc) {
CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
CassandraJavaRDD<CassandraRow> cassandraRDD2 = null;
While(Some Condition)
cassandraRDD = CassandraJavaUtil
.javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
.where("pid IN ('" + sb + "')");
if(cassandraRDD2==null){
cassandraRDD2=cassandraRDD;
}
else{
cassandraRDD2 = cassandraRDD2.union(cassandraRDD);
}
}
}
但是在工会中,我遇到了以下错误。
类型不匹配:无法从JavaRDD转换为CassandraJavaRDD
虽然两者的RDD的类型相似。
所以1)我应否将演员表
cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);
2)或将RDD之一的类型更改为JavaRDD
最佳答案
发生问题是因为根据docs:
方法:union(JavaRDD other)返回此RDD与另一个的联合。
返回值:JavaRDD
因此不匹配。
因为根据this:
public class CassandraJavaRDD<R> extends JavaRDD<R> {
...
}
CassandraJavaRDD
类扩展了JavaRDD
,因此您可以使用:JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;
因此
union()
方法的返回值将与其类型匹配。