./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt .
我们把本地文件系统中的“/usr/local/spark/mycode/wordcount/word.txt”上传到分布式文件系统HDFS中(放到hadoop用户目录下)
在pyspark窗口中,就可以使用下面任意一条命令完成从HDFS文件系统中加载数据:
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") >>> lines = sc.textFile("/user/hadoop/word.txt") >>> lines = sc.textFile("word.txt")
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建
>>> nums = [1,2,3,4,5]
>>> rdd = sc.parallelize(nums)
2.启动spark集群
启动Hadoop集群
cd /usr/local/hadoop/
sbin/start-all.sh
启动Spark的Master节点和所有slaves节点
cd /usr/local/spark/ sbin/start-master.sh sbin/start-slaves.sh
3.Hadoop yarn管理器
4.从文件系统中加载数据创建rdd
(1).下面请切换回pyspark窗口,看一下如何从本地文件系统中加载数据:
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
5.rdd操作
- 转换操作
下面列出一些常见的转换操作(Transformation API):
* filter(func):筛选出满足函数func的元素,并返回一个新的数据集
* map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
* flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
* groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
* reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚
- 行动操作
下面列出一些常见的行动操作(Action API):
* count() 返回数据集中的元素个数
* collect() 以数组的形式返回数据集中的所有元素
* first() 返回数据集中的第一个元素
* take(n) 以数组的形式返回数据集中的前n个元素
* reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
* foreach(func) 将数据集中的每个元素传递到函数func中运行*
6.创建键值对
(1)从文件中加载
>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt") >>> pairRDD = lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1)) >>> pairRDD.foreach(print) (i,1) (love,1) (hadoop,1) (i,1) (love,1) (Spark,1) (Spark,1) (is,1) (fast,1) (than,1) (hadoop,1)
(2)通过并行集合(列表)创建rdd
>>> list = ["Hadoop","Spark","Hive","Spark"] >>> rdd = sc.parallelize(list) >>> pairRDD = rdd.map(lambda word : (word,1)) >>> pairRDD.foreach(print) (Hadoop,1) (Spark,1) (Hive,1) (Spark,1)
常用的键值对转换操作
reduceByKey(func)
reduceByKey(func)的功能是,使用func函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同key的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。可以看出,(a,b) => a+b这个Lamda表达式中,a和b都是指value,比如,对于两个具有相同key的键值对(“spark”,1)、(“spark”,2),a就是1,b就是2。
>>> pairRDD.reduceByKey(lambda a,b : a+b).foreach(print) (Spark,2) (Hive,1) (Hadoop,1)
groupByKey()
groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。
我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作。
Keys()
keys()只会把键值对RDD中的key返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用keys()后得到的结果是一个RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。
values()
values()只会把键值对RDD中的value返回形成一个新的RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的RDD,采用values()后得到的结果是一个RDD[Int],内容是{1,2,3,5}。
sortByKey()
>>> pairRDD.sortByKey() PythonRDD[30] at RDD at PythonRDD.scala:48 >>> pairRDD.sortByKey().foreach(print) (Hadoop,1) (Hive,1) (Spark,1) (Spark,1)
mapValues(func)
我们经常会遇到一种情形,我们只想对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。对于这种情形,Spark提供了mapValues(func),它的功能是,对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的pairRDD,如果执行pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。
join
>>> pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)]) >>> pairRDD2 = sc.parallelize([('spark','fast')]) >>> pairRDD1.join(pairRDD2) PythonRDD[49] at RDD at PythonRDD.scala:48 >>> pairRDD1.join(pairRDD2).foreach(print)
综合实例
题目:给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
很显然,对于上面的题目,结果是很显然的,(“spark”,4),(“hadoop”,5)。
下面,我们在pyspark中演示代码执行过程:
>>> rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)]) >>> rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).mapValues(lambda x : (x[0] / x[1])).collect()
(1)首先构建一个数组,数组里面包含了四个键值对,然后,调用parallelize()方法生成RDD,从执行结果反馈信息,可以看出,rdd类型是RDD[(String, Int)]。
(2)针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。比如,键值对(“spark”,2)经过mapValues()函数处理后,就变成了(“spark”,(2,1)),其中,数值1表示“spark”这个键的1次出现。下面就是rdd.mapValues()操作在spark-shell中的执行演示:
scala> rdd.mapValues(x => (x,1)).collect()
res23: Array[(String, (Int, Int))] = Array((spark,(2,1)), (hadoop,(6,1)), (hadoop,(4,1)), (spark,(6,1)))
上面语句中,collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。
(3)然后,再对上一步得到的RDD调用reduceByKey()函数,在spark-shell中演示如下:这里,必须要十分准确地理解reduceByKey()函数的功能。可以参考上面我们对该函数的介绍,reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式 x,y : (x[0]+y[0],x[1] + y[1]),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value,比如,在这个例子中, (“hadoop”,(6,1))和(“hadoop”,(4,1))这两个键值对具有相同的key,所以,对于函数中的输入参数(x,y)而言,x就是(6,1),序列从0开始计算,x[0]表示这个键值对中的第1个元素6,x[1]表示这个键值对中的第二个元素1,y就是(4,1),y[0]表示这个键值对中的第1个元素4,y[1]表示这个键值对中的第二个元素1,所以,函数体(x[0]+y[0],x[1] + y[2]),相当于生成一个新的键值对(key,value),其中,key是x[0]+y[0],也就是6+4=10,value是x[1] + y[1],也就是1+1=2,因此,函数体(x[0]+y[0],x[1] + y[1])执行后得到的value是(10,2),但是,要注意,这个(10,2)是reduceByKey()函数执行后,”hadoop”这个key对应的value,也就是,实际上reduceByKey()函数执行后,会生成一个键值对(“hadoop”,(10,2)),其中,10表示hadoop书籍的总销量,2表示两天。同理,reduceByKey()函数执行后会生成另外一个键值对(“spark”,(8,2))。
(4)最后,就可以求出最终结果。我们可以对上面得到的两个键值对(“hadoop”,(10,2))和(“spark”,(8,2))所构成的RDD执行mapValues()操作,得到每种书的每天平均销量。当第一个键值对(“hadoop”,(10,2))输入给mapValues(x => (x[0] / x[1]))操作时,key是”hadoop”,保持不变,value是(10,2),会被赋值给Lamda表达式x => (x[0] / x[1]中的x,因此,x的值就是(10,2),x[0]就是10,表示hadoop书总销量是10,x[1]就是2,表示2天,因此,hadoop书籍的每天平均销量就是x[0] / x[1],也就是5。mapValues()输出的一个键值对就是(“hadoop”,5)。同理,当把(“spark”,(8,2))输入给mapValues()时,会计算得到另外一个键值对(“spark”,4)。
共享变量
广播变量
广播变量(broadcast variables)允许程序开发人员在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。通过这种方式,就可以非常高效地给每个节点(机器)提供一个大的输入数据集的副本。Spark的“动作”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。这就意味着,显式地创建广播变量只有在下面的情形中是有用的:当跨越多个阶段的那些任务需要相同的数据,或者当以反序列化方式对数据进行缓存是非常重要的。
>>> broadcastVar = sc.broadcast([1, 2, 3]) >>> broadcastVar.value [1,2,3]
累加器
累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,程序开发人员可以编写对新类型的支持。如果创建累加器时指定了名字,则可以在Spark UI界面看到,这有利于理解每个执行阶段的进程。
一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建。运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值。
>>> accum = sc.accumulator(0) >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x)) >>> accum.value 10