由于从Cassandra查询数据存在限制,因此我尝试使用Spark逐批读取数据并将其存储在RDD中。

然后,我使用并集函数添加所有的RDD。

这是我的代码。

private void getDataFromCassandra(JavaSparkContext sc) {


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
    CassandraJavaRDD<CassandraRow> cassandraRDD2  = null;

    While(Some Condition)

     cassandraRDD = CassandraJavaUtil
                .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
                .where("pid IN ('" + sb + "')");

    if(cassandraRDD2==null){


     cassandraRDD2=cassandraRDD;
    }
    else{
        cassandraRDD2 =  cassandraRDD2.union(cassandraRDD);
    }
}


}

但是在工会中,我遇到了以下错误。

类型不匹配:无法从JavaRDD转换为CassandraJavaRDD

虽然两者的RDD的类型相似。

所以1)我应否将演员表

 cassandraRDD2 =  (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);


2)或将RDD之一的类型更改为JavaRDD

最佳答案

发生问题是因为根据docs


  方法:union(JavaRDD other)返回此RDD与另一个的联合。
  
  返回值:JavaRDD


因此不匹配。

因为根据this

public class CassandraJavaRDD<R> extends JavaRDD<R> {
...
}


CassandraJavaRDD类扩展了JavaRDD,因此您可以使用:

JavaRDD<CassandraRow> cassandraRDD = null;
JavaRDD<CassandraRow> cassandraRDD2 = null;


因此union()方法的返回值将与其类型匹配。

10-08 11:20