1. 算子
package com.test; import java.util.Arrays;
import java.util.List; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; public class Test {
private static final int Function2 = 0; public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setAppName("Test")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD linesRdd = sc.textFile("/home/test/a.txt");
/**
* FlatMapFunction 中的2个String 分别代表输入参数类型和输出参数类型
*/
JavaRDD<String> wordsRDD = linesRdd.flatMap(new FlatMapFunction<String, String>(){
private static final long serialVersionUID = 1L; @Override
public Iterable<String> call(String line) throws Exception {
/**
* 参数 line 就代表 linesRDD中的每一条记录
*/
List<String> list = Arrays.asList(line.split(" "));
return list;
}
}); /**
* 要将每一个单词计数为1
* wordsRDD 是一个非 K V 格式的Rdd,
* 在java api 中要返回一个K V 格式的rdd, 必须使用 mapToPair 方法
* return 结果就是一个 K V 格式
*/
JavaPairRDD<String, Integer> pairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
}); /**
* 使用reduceByKey 进行聚合操作
* 1. 进行 groupByKey 将相同的 key 分割到一个组里去, 然后通过传入的函数对主内的数据进行聚合
* call 方法将会自动将个数循环相加
*/
JavaPairRDD<String, Integer> resultRdd = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}); /**
* 按照单词出现的次数进行排序, 应为排序需要对 v 值(出现个数)进行排序, 所以需要将 K V, 进行调换, 因为sortByKey只对key能进行排序
* 先使用 mapToPair 来调换位置
* sortByKey 进行排序
* 再使用 mapToPair 来调换位置
*/
resultRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2,tuple._1);
}
}).sortByKey().mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
return new Tuple2<String, Integer>(tuple._2, tuple._1);
}
}).foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println(tuple);
}
});
}
}
jar包 链接:https://pan.baidu.com/s/1UDp81G8tY7IgwJatlT_1Vg 密码:yj06