PageRank(PR)算法
- 用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
- 从本质上讲,PageRank是找出图中顶点(网页链接)的重要性
- GraphX提供了PageRank API用于计算图的PageRank
使用方法: graph.pageRank(0.0001).vertices => (id,rank)
案例分析1:
找出用户社交网络中最重要的用户
import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object PageRankTest01 { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate() val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50))) val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters) val followRelations = Array( Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4), Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3)) val edgeRDD = spark.sparkContext.parallelize(followRelations) val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) val ranks = graph.pageRank(0.0001) ranks.vertices.sortBy(_._2, false).collect.foreach(println) println("===============indegrees==============") // indegrees graph.inDegrees.foreach(println(_)) println("===============outDegrees==============") graph.outDegrees.foreach(println(_)) } }
案例分析2:
找出各个用户的重要性
name.txt
1,Mike 2,Nancy 3,Selina 4,Tom
test02.txt
1 2 2 2 3 2 4 2 3 1 2 2 3 1 2 2 1 4
import org.apache.spark.graphx.GraphLoader import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession object PageRankTest02 { def main(args: Array[String]): Unit = { // val conf = new SparkConf().setAppName("app").setMaster("local[2]") // val sc = SparkContext.getOrCreate(conf) val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate() val sc = spark.sparkContext // 1. 另一种构建Graph的方法: GraphLoader.edgeListFile // 适用于不知道点和边的属性(默认为1),只有边 - 边的情况 // 格式必须是 1空格2 val graph = GraphLoader.edgeListFile(sc, "src/main/resources/graphx01/test02.txt").cache() // graph.edges.collect().foreach(println(_)) // graph.vertices.collect().foreach(println(_)) // 2. 根据边边关系,可做pagerank: (id,rankRate) val rankVertices = graph.pageRank(0.001).vertices // 3. verticeRDD val verticeRDD = sc.textFile("src/main/resources/graphx01/name.txt").map(x=>{ val arr = x.split(",") // (id,NAME) (arr(0).toLong,arr(1)) }) // VD顶点的属性 = 1[INT] // (VertexId, VD, U) => VD // joinVertices方法只能改顶点的属性值,不能修改顶点属性类型 // graph.joinVertices(verticeRDD)((id,old,newval)=>(2)).vertices.foreach(println(_)) // 4.使用RDD的join => (K,(V,W)) // Rankgraph.vertices:(1,rate) VRDD:(1,Mike) => (Mike,rate) rankVertices.join(verticeRDD).map(f = { case (id, (rate, name)) => (name, rate) }) .foreach(println) } }