我的数据集是这样的:
a,b,c,d
---------
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4
输出应按字段
a
,b
分组,然后对d
值求和并计算不同的c
值。因此输出应为:1,2005,2,9
2,2005,1,4
编辑
我的代码是这样的:
JavaRDD<String> csv = spark.read().texfile("path.csv").javaRDD();
JavaRDD<String[]> rdd = csv.map(s -> s.split(","))
JavaPairRDD<String , Tuple2<Long, String>> tuple = rdd.mapToPair(x -> new Tuple2<>(x[0]+","+ x[1], new Tuple2<>(x[2], x[3])));
JavaPairRDD<String , Tuple2<Long, String>> tuple2 = tuple.reduceByKey((x,y) -> x._2()+y._2());
但是我不知道如何计算不同的
c
值。 最佳答案
问题下方将加载与您提供的输入类似的CSV
a,b,c,d
1,2005,A,2
1,2005,A,3
1,2005,B,4
2,2005,A,4
并进行所需的映射并减少操作,
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.Tuple4;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SimpleDataframe {
public static void main(String[] args) {
SparkSession spark = Constant.getSparkSess();
JavaRDD<Row> rdd = spark.read().option("header", "true").csv("src/main/resources/simple.csv").rdd().toJavaRDD();
List<Tuple4<String, String, Integer, Integer>> output =
rdd.mapToPair( // Map input to key(String,String) and value (Collection,Num)
(PairFunction<Row, Tuple2<String, String>, Tuple2<Set<String>, Integer>>) row -> {
Tuple2<String, String> tup1 = new Tuple2<>(row.getString(0), row.getString(1));
Set<String> set = new HashSet<>();
set.add(row.getString(2));
Tuple2<Set<String>, Integer> tup2 = new Tuple2<>(set, Integer.parseInt(row.getString(3)));
return new Tuple2<>(tup1, tup2);
}
).reduceByKey((Function2<Tuple2<Set<String>, Integer>, // Combine out by key to single tuple per unique tuple1
Tuple2<Set<String>, Integer>, Tuple2<Set<String>, Integer>>) (v1, v2) -> {
Set<String> set = new HashSet<>();
set.addAll(v1._1);
set.addAll(v2._1);
int num = v1._2 + v2._2;
return new Tuple2<>(set, num);
}) //// Simplest operation Mapping the combined result to required output
.map(tuple -> new Tuple4<>(tuple._1._1, tuple._1._2, tuple._2._1.size(), tuple._2._2))
.collect();
System.out.println(output);
}
}