我需要在巨大的RDD上进行复杂的计算,但是为了简单起见,我将问题简化为简单得多的事情:

我有这样获得的JavaRDD:

 JavaRDD<Student> students = sc.parallelize(
                javaFunctions(sc).cassandraTable("test", "school",   mapRowTo(Student.class)).collect());


学生类如下所示:

public class Student implements Serializable{
    Integer id;
    Integer classroom;
    String name;
    Integer mark1;
    Integer mark2;
   // ... getters and setters


现在,我希望在一次迭代中为每个教室使用stddedv,mark1和mark2列的平均值,是否使用StatCounter。
我知道如何使用StatCounter,但是

JavaRDD<Numeric>


就我而言

JavaRDD<Student>


有任何想法吗 ?

谢谢

最佳答案

首先,永远不要:

sc.parallelize(someRDD.collect());


只是一个好主意。像以往一样。

现在:


  一次迭代,以使每个教室都有stddedv,av1和mark2列的平均值,并尽可能使用StatCounter


可以,但是只需将DataFrameCassandra connector一起使用:

import static org.apache.spark.sql.functions.*;

spark
 .read
 .format("org.apache.spark.sql.cassandra")
 .options(Map( "table" -> "school", "keyspace" -> "test"))
 .load()
 .groupBy("classroom"))
 .agg(mean("mark1"), stddev("mark1"), mean("mark2"), stddev("mark2"));


使用统计计数器,可以将JavaPairRDD<Integer,Tuple2<Integer,Integer>>(class, (mark1, , mark2)))和combineByKey转换为Tuple2StatCounters。您也可以将Tuple2替换为mllib.Vector并将其聚合为MultivariateStatisticalSummary

关于java - 以最佳方式计算JavaRDD的统计信息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/47798152/

10-10 02:37