package URL1 import org.apache.spark.Partitioner import scala.collection.mutable class MyPartitioner(val num:Array[String]) extends Partitioner{
val parMap=new mutable.HashMap[String,Int]()
var count=
for(i<-num){
parMap.put(i,count)
count +=
} //分区数目
override def numPartitions: Int = num.length //分区的规则
//def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
override def getPartition(key: Any): Int = {
// 将对象转换为指定类型;
val tople=key.asInstanceOf[(String,String)]
val subject=tople._1
this.parMap(subject) }
}
package URL1 class Orders extends Ordering[((String,String),Int)]{
override def compare(x: ((String, String), Int), y: ((String, String), Int)): Int = {
x._2-y._2
}
}
package URL1 import java.net.URL import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object teacher {
def main(args: Array[String]): Unit = {
val cof=new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
val sc=new SparkContext(cof) val lines=sc.textFile("E:\\teacher.log") val result1:RDD[((String,String),Int)]=lines.map( tp=>{
val teacher=tp.split("/").last
val host=new URL(tp).getHost
val subject=host.substring(,host.indexOf("."))
((subject,teacher),)
}) //科目
val subject=result1.map(tp=>tp._1._1).distinct().collect() //分区
val partitions=new MyPartitioner(subject) //业务逻辑
//1.全局TOPN
// val result2=result1.reduceByKey(partitions,_+_).sortBy(-_._2).take(2).foreach(println) //1.全局TOPN
val result3=result1.foreachPartition(tp=>{
val treeSet=new mutable.TreeSet[((String,String),Int)]()(new Orders) tp.foreach(tp=>{
treeSet.add(tp)
if(treeSet.size>){
treeSet.dropRight()
}
}) treeSet.foreach(println)
}) sc.stop()
}
}
teacher.log
http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/zhangsan
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/lisi
http://bigdata.baidu.cn/wangwu
http://bigdata.baidu.cn/wangwu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/xiaoxu
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://javaee.baidu.cn/laoyang
http://php.baidu.cn/laoli
http://php.baidu.cn/laoliu
http://php.baidu.cn/laoli
http://php.baidu.cn/laoli