我的数据集是这样的:

a,b,c,d
---------
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4


输出应按字段ab分组,然后对d值求和并计算不同的c值。因此输出应为:

1,2005,2,9
2,2005,1,4


编辑

我的代码是这样的:

    JavaRDD<String> csv = spark.read().texfile("path.csv").javaRDD();
    JavaRDD<String[]> rdd = csv.map(s -> s.split(","))
    JavaPairRDD<String , Tuple2<Long, String>> tuple = rdd.mapToPair(x -> new Tuple2<>(x[0]+","+ x[1], new Tuple2<>(x[2], x[3])));
    JavaPairRDD<String , Tuple2<Long, String>> tuple2 = tuple.reduceByKey((x,y) -> x._2()+y._2());


但是我不知道如何计算不同的c值。

最佳答案

问题下方将加载与​​您提供的输入类似的CSV

a,b,c,d
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4


并进行所需的映射并减少操作,

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.Tuple4;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class SimpleDataframe {

    public static void main(String[] args) {
        SparkSession spark = Constant.getSparkSess();


        JavaRDD<Row> rdd = spark.read().option("header", "true").csv("src/main/resources/simple.csv").rdd().toJavaRDD();

        List<Tuple4<String, String, Integer, Integer>> output =
                rdd.mapToPair(   // Map input to key(String,String) and value (Collection,Num)
                (PairFunction<Row, Tuple2<String, String>, Tuple2<Set<String>, Integer>>) row -> {
                    Tuple2<String, String> tup1 = new Tuple2<>(row.getString(0), row.getString(1));
                    Set<String> set = new HashSet<>();
                    set.add(row.getString(2));
                    Tuple2<Set<String>, Integer> tup2 = new Tuple2<>(set, Integer.parseInt(row.getString(3)));
                    return new Tuple2<>(tup1, tup2);
                }
        ).reduceByKey((Function2<Tuple2<Set<String>, Integer>,   // Combine out by key to single tuple per unique tuple1
                        Tuple2<Set<String>, Integer>, Tuple2<Set<String>, Integer>>) (v1, v2) -> {
            Set<String> set = new HashSet<>();
            set.addAll(v1._1);
            set.addAll(v2._1);
            int num = v1._2 + v2._2;
            return new Tuple2<>(set, num);
        }) //// Simplest operation Mapping the combined result to required output
                        .map(tuple -> new Tuple4<>(tuple._1._1, tuple._1._2, tuple._2._1.size(), tuple._2._2))
                .collect();

        System.out.println(output);

    }
}


10-08 01:43