1.代码

 object LogPVAndUV{
def main(args:Array[String]):Unit={
val conf=new SparkConf()
.setMaster("local[*]")
.setAppName("PVAndUV")
val sc=SparkContext.getOrCreate(conf)
val logPath="/user/beifeng/spark/logs/page_views.data"
val logRDD=sc.textFile(logPath)
val filterRDD=logRDD.filter(_.length>0)
//转换
val mapRDD=filterRDD.map(line=>{
val arr=line.split("\t")
if(arr.length==7){
val date=arr(0).trim
val url=arr(1)
val uuid=arr(2)
(date.subString(0,Math.min(10.date.length)).trim,url,uuid)
}else{
(null,null,null)
}
}).filter(tuple=>tuple._1!=null&&tuple._1.length>0)
//PV计算
val pvRDD=mapRDD
.filter(tuple=>tuple._2.length>0)
.map(tuple=>(tuple._1,1))
.reduceByKey(_+_)
//UV计算
val uvRDD=mapRDD
.filter(tuple=>tuple._3.length>0)
.map(tuple=>(tuple._1,tuple._3))
.distinct
.reduceByKey(_+_)
//合并
val pvAndUv=pvRDD.join(uvRDD).map{
case (date,(pv,uv))=>{
(date,pv,uv)
}
}
//输出
pvAndUv.saveAsTextFile("/user/beifeng/spark/output/"+System.currentTimeMillis())
sc.stop()
}
}

2.PS

  rdd.foreachPartition(iter=>{

    //

  })

  对iter迭代器中的数据进行输出,iter表示的是一个分区的所有数据,这里的迭代器和groupbyKey中的实现方式不同,不会产生OOM

  主要用于将数据输出到非HDFS的存储系统中,不如MYSQL,Redis

05-11 15:54