一.简介

  参考博客:https://www.cnblogs.com/yszd/p/10186556.html

二.代码实现

 package graphx

 import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SparkSession /**
* Created by Administrator on 2019/10/22.
*/
object AggregateMessage {
/**
* 设置日志级别为WARN
*/
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]) {
/**
* 创建spark入口
*/
val spark = SparkSession.builder().appName("AggregateMessage").master("local[2]").getOrCreate()
val sc = spark.sparkContext /**
* 随机生成图,默认出度为4,标准偏差为1.3,并行生成numVertices,partition默认为sc的默认partition
*/
val graph = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices((id, _) => id.toDouble)
graph.vertices.take(5).foreach(println)
/**
* 将用户定义的sendMsg函数应用于图形中的每个边三元组,然后使用mergeMsg函数汇聚信息到目标顶点
*/
val olderFollowers = graph.aggregateMessages[(Int, Double)](triplet =>{
if(triplet.srcAttr > triplet.dstAttr){
triplet.sendToDst(1, triplet.srcAttr)
}
},
(a, b) => (a._1 + b._1, a._2 + b._2)
) /**
* 求平均值
*/
val avgAgeOfOlderFollowers = olderFollowers.mapValues((id, value) => value match {case (count, totalAge) => totalAge / count}) /**
* 输出结果
*/
avgAgeOfOlderFollowers.collect().take(5).foreach(println)
}
}

三.结果

  随机生成的顶点数据:

    Spark GraphX图计算核心算子实战【AggreagteMessage】-LMLPHP

  聚合结果:

    Spark GraphX图计算核心算子实战【AggreagteMessage】-LMLPHP

05-11 22:36