Spark的intersection

intersection顾名思义,他是指交叉的。当两个RDD进行intersection后,将保留两者共有的。因此对于RDD1.intersection(RDD2) 和RDD2.intersection(RDD1) 。应该是一致的。

比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7},对于包含这两个List的RDD来说,他们进行一次intersection应该得到result={3,4,5}

Spark的subtract

subtract则和intersection不同,他是找出两者之间不一致的内容。

比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次subtract得到的结果是跟顺序有关的。

list1.subtract(list2)

结果应该为

1 2

而对于

list2.subtract(list1)

结果应该为

6 7

Spark的union

union最好理解,他是把两个RDD进行整合,但不考虑其中重复的情况。比如对于,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他们进行一次union得到的结果是跟顺序无关的。结果应该为

result = {1,2,3,4,5,3,4,5,6,7}

Spark的distinct

distinc 是将RDD中重复的内容剔除,注意,这个剔除的过程并不会把重复的元素都去掉,而是重复的元素只保留一份。这当然很好理解,比如result = {1,2,3,4,5,3,4,5,6,7},进行一次distinct,则得到{1,2,3,4,5,6,7}

一个综合的例子

考虑到intersection、subtract、union和distinct比较常用,且在一个案例中能够很好体现其特点。因此我们这次获取的数据集是两个课程,lesson1和lesson2。lesson1中有十位同学,每个同学都有着许多个能力的估值,该估值是一个Int类型数据。lesson2中也是如此。对于这两个数据集我将其分别放在lesson1中和lesson2中。数据集和下面的代码均可以在github上找到并下载。

数据集分析

对于lesson1,里面有很多同学,每个同学又有很多次能力估值。在Spark入门(六)--Spark的combineByKey、sortBykey中已经提到过给每个人的成绩求平均分,因此这里不做这个处理。

这两个数据集我们解决如下的问题:

  • 0、计算lesson1和lesson2中每个同学的能力总估值
  • 1、找出lesson1中所有的同学(不重复)
  • 2、找出lesson2中所有同学(不重复)
  • 3、找出选了两门课程的同学
  • 4、找出只在lesson1而不在lesson2中的同学
  • 5、找出只在lesson2而不在lesson1中的同学

数据的部分内容展示

Spark入门(七)--Spark的intersection、subtract、union和distinc-LMLPHP

对于第0个问题,因为用到的并非本节的内容,因此标注为0。要求每个课程中的每个同学能力的总估值,首先要对数据进行处理,按空格拆分。拆分后的数据应该是(姓名,分数)的元组集合,然后根据姓名对分数进行累加。

  • 第一个问题中找出lesson1中所有同学,只要得到了每个同学能力的总估值,去掉分数,即可知道lesson1中的所有同学。

  • 第二题同理。

  • 第三题要找出选了两门课的同学,则要对两门课所有的同学进行一次整合,然后剔除重复的数据,即先union再distinc

  • 第四题要找到lesson1中而不在lesson二中的同学,则只要对lesson1的同学和lesson2中的同学进行一次substract即可

  • 第五题同理

scala实现


import org.apache.spark.{SparkConf, SparkContext} object SparkIntersectionAndSubtract { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtract") val sc = new SparkContext(conf) //课程一中的数据
val lesson1Data = sc.textFile("./lesson1").map(line => (line.split(" ")(0),line.split(" ")(1).toInt)) //将课程一中每个人的分数相加
val lesson1Grade = lesson1Data.reduceByKey(_+_) val lesson1Student = lesson1Grade.map(x=>x._1) //课程二中的数据处理
val lesson2Data = sc.textFile("./lesson2").map(line => (line.split(" ")(0),line.split(" ")(1).toInt)) //将课程二中每个人的分数相加
val lesson2Grade = lesson2Data.reduceByKey((x,y)=>x+y) val lesson2Student = lesson2Grade.map(x=>x._1) //在课程一中的人且在课程二中的人的集合
println("Students On Lesson1 And On Lesson2")
lesson1Student.intersection(lesson2Student).foreach(println) //在课程二中的人且在课程一中的人的集合,与上面的结果一致
println("Students On Lesson1 And On Lesson2")
lesson2Student.intersection(lesson1Student).foreach(println) //在课程一中的人但不在课程二中的人的集合
println("Students Only In Lesson1")
val onlyInLesson1 = lesson1Student.subtract(lesson2Student)
onlyInLesson1.foreach(println) //在课程二中的人但不在课程二中的人的集合
println("Students Only In Lesson2")
val onlyInLesson2 = lesson2Student.subtract(lesson1Student)
onlyInLesson2.foreach(println) //只选了一门课的同学
println("Students Only Choose One Lesson")
lesson1Student.union(lesson2Student).foreach(println) //两门课所有学生(不重复)
println("All the students")
lesson1Student.union(lesson2Student).distinct().foreach(print) } }

java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2; public class SparkIntersectionAndSubtractJava { public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtractJava"); JavaSparkContext sc = new JavaSparkContext(conf); //java7实现
intersectionAndSubtractJava(sc); //java8实现
intersectionAndSubtractJava8(sc);
} public static void intersectionAndSubtractJava(JavaSparkContext sc){ JavaRDD<String> lesson1Data = sc.textFile("./lesson1"); JavaRDD<String> lesson2Data = sc.textFile("./lesson2"); JavaPairRDD<String,Integer> lesson1InfoData = lesson1Data.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
}
}); JavaPairRDD<String,Integer> lesson2InfoData = lesson2Data.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
}
}); JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}); JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
}); JavaRDD<String> lesson1Students = lesson1Grades.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2._1;
}
}); JavaRDD<String> lesson2Students = lesson2Grades.map(new Function<Tuple2<String, Integer>, String>() {
@Override
public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2._1;
}
}); //既在lesson1中又在lesson2中的学生
System.out.println("Students On Lesson1 And On Lesson2");
lesson1Students.intersection(lesson2Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); //既在lesson2中又在lesson1中的学生,与上面的结果一致
System.out.println("Students On Lesson1 And On Lesson2");
lesson2Students.intersection(lesson1Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); //只在lesson1中而不在lesson2中的学生
JavaRDD<String> studensOnlyInLesson1 = lesson1Students.subtract(lesson2Students);
System.out.println("Students Only In Lesson1");
lesson1Students.subtract(lesson2Students).foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); //只在lesson2中而不在lesson1中的学生
JavaRDD<String> studensOnlyInLesson2 = lesson2Students.subtract(lesson1Students);
System.out.println("Students Only In Lesson2");
studensOnlyInLesson2.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); //只选了一门课的学生
JavaRDD<String> onlyOneLesson = studensOnlyInLesson1.union(studensOnlyInLesson2);
System.out.println("Students Only Choose One Lesson");
onlyOneLesson.foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); System.out.println("All the students");
lesson1Students.union(lesson2Students).distinct().foreach(new VoidFunction<String>() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
}); } public static void intersectionAndSubtractJava8(JavaSparkContext sc){ JavaRDD<String> lesson1Data = sc.textFile("./lesson1"); JavaRDD<String> lesson2Data = sc.textFile("./lesson2"); JavaPairRDD<String,Integer> lesson1InfoData =
lesson1Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]))); JavaPairRDD<String,Integer> lesson2InfoData =
lesson2Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1]))); JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey((x,y) -> x+y); JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey((x,y) -> x+y); JavaRDD<String> studentsInLesson1 = lesson1Grades.map(x->x._1); JavaRDD<String> studentsInLesson2 = lesson2Grades.map(x->x._1); //既在lesson1中又在lesson2中的学生
studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name)); //既在lesson2中又在lesson1中的学生,与上面的结果一致
studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name)); //只在lesson1中的学生
JavaRDD<String> studentsOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2);
studentsOnlyInLesson1.foreach(name -> System.out.println(name)); //只在lesson2中的学生
JavaRDD<String> studentsOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1);
studentsOnlyInLesson2.foreach(name -> System.out.println(name)); //只选了一门课的学生
JavaRDD<String> studentsOnlyOneLesson = studentsOnlyInLesson1.union(studentsInLesson2);
studentsOnlyOneLesson.foreach(name -> System.out.println(name)); studentsInLesson1.union(studentsInLesson2).distinct().foreach(name -> System.out.println(name)); } }

python实现

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

#lesson1数据
lesson1Data = sc.textFile("./lesson1").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1]))) #lesson2数据
lesson2Data = sc.textFile("./lesson2").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1]))) #lesson1中每个人的总分
lesson1InfoData = lesson1Data.reduceByKey(lambda x,y:x+y) #lesson2中每个人的总分
lesson2InfoData = lesson2Data.reduceByKey(lambda x,y:x+y) #lesson1中的学生
studentsInLesson1 = lesson1InfoData.map(lambda x:x[0]) #lesson2中的学生
studentsInLesson2 = lesson2InfoData.map(lambda x:x[0]) #在lesson1中且在lesson2中的学生
print("Students On Lesson1 And On Lesson2")
studentsInLesson1.intersection(studentsInLesson2).foreach(print) #在lesson2中且在lesson1中的学生,与上面的结果一致
print("Students On Lesson1 And On Lesson2")
studentsInLesson2.intersection(studentsInLesson1).foreach(print) #只在lesson1中的学生
print("Students Only In Lesson1")
studensOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2)
studensOnlyInLesson1.foreach(print) #只在lesson2中的学生
print("Students Only In Lesson2")
studensOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1)
studensOnlyInLesson2.foreach(print) #只选了一门课的学生
print("Students Only Choose One Lesson")
studensOnlyInLesson1.union(studensOnlyInLesson2).foreach(print) #两门课所有学生(不重复)
print("All the students")
studentsInLesson1.union(studentsInLesson2).distinct().foreach(print)

运行得到结果

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Bob
Coco Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Coco
Bob Students Only In Lesson1
Bill
David
Mike
Nancy
Lucy Students Only In Lesson2
White
Jimmy
Jason
John
Frank Students Only Choose One Lesson
Bill
David
Mike
Nancy
Lucy
White
Jimmy
Jason
John
Frank All the students
Vicky
Bill
Amy
White
Jimmy
Jason
Lili
David
Bob
Mike
Coco
Nancy
Lucy
John
Frank

Spark入门(七)--Spark的intersection、subtract、union和distinc-LMLPHP

通过上面的例子,非常具体地应用了intersection、subtract、union和distinct来解决具体的问题。并且利用好这几个方法能够很快速地进行一些数据集之间的关系操作。事实上,直接利用这几种方法比我们自己动手实现要好很多,因为spark中对这几种方法进行了优化。

数据集和代码均可以在github上找到并下载

转自:https://juejin.im/post/5c7b92276fb9a049bb7d0d10

05-01 05:14