可以在here中找到一些上下文,这个想法是我根据从Hive表上的请求中收集的元组创建了一个图。这些对应于国家之间的贸易关系。
以这种方式构建图形后,顶点不会被标记。我想研究学位的分布并获得联系最紧密的国家/地区的名称。我尝试了2种选择:

  • 首先:我试图在函数的内部使用idMapbis函数映射具有顶点的字符串名称的顶点索引,该函数正在收集和打印十个最高连接度。
  • 第二:我试图将标签添加到图形本身的顶点。

  • 在两种情况下,我都会收到以下错误:任务无法序列化
    全局代码:
    import org.apache.spark.SparkContext
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    
    val sqlContext= new org.apache.spark.sql.hive.HiveContext(sc)
    val data = sqlContext.sql("select year, trade_flow, reporter_iso, partner_iso, sum(trade_value_us) from comtrade.annual_hs where length(commodity_code)='2' and not partner_iso='WLD' group by year, trade_flow, reporter_iso, partner_iso").collect()
    val data_2010 = data.filter(line => line(0)==2010)
    val couples = data_2010.map(line=>(line(2),line(3))) //pays->pays
    
    夫妻看起来像这样:Array [(Any,Any)] = Array((MWI,MOZ),(WSM,AUS),(MDA,CRI),(KNA,HTI),(PER,ERI),(SWE,CUB ),...
    val idMap = sc.broadcast(couples
    .flatMap{case (x: String, y: String) => Seq(x, y)}
    .distinct
    .zipWithIndex
    .map{case (k, v) => (k, v.toLong)}
    .toMap)
    
    val edges: RDD[(VertexId, VertexId)] = sc.parallelize(couples
    .map{case (x: String, y: String) => (idMap.value(x), idMap.value(y))})
    
    val graph = Graph.fromEdgeTuples(edges, 1)
    
    以这种方式构建,例如,顶点看起来像(68,1)
    val degrees: VertexRDD[Int] = graph.degrees.cache()
    
    //Most connected vertices
    def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(Int, Int)] = {
    val namesAndDegrees = degrees.innerJoin(graph.vertices) {
     (id, degree, k) => (id.toInt, degree)}
    val ord = Ordering.by[(Int, Int), Int](_._2)
    namesAndDegrees.map(_._2).top(10)(ord)}
    topNamesAndDegrees(degrees, graph).foreach(println)
    
    我们得到:(79,1016),(64,912),(55,889)...
    检索名称的第一个选择:
    val idMapbis = sc.parallelize(couples
    .flatMap{case (x: String, y: String) => Seq(x, y)}
    .distinct
    .zipWithIndex
    .map{case (k, v) => (v,k)}
    .toMap)
    
    def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]):  Array[(String, Int)] = {
    val namesAndDegrees = degrees.innerJoin(graph.vertices) {
     (id, degree, name) => (idMapbis.value(id.toInt), degree)}
    val ord = Ordering.by[(String, Int), Int](_._2)
    namesAndDegrees.map(_._2).top(10)(ord)}
    topNamesAndDegrees(degrees, graph).foreach(println)
    
    该任务不可序列化,但idMapbis函数正在运行,因为idMapbis.value(graph.vertices.take(1)(0)._ 1.toInt)没有错误
    选项2:
    graph.vertices.map{case (k, v) => (k,idMapbis.value(k.toInt))}
    
    该任务无法再次序列化(在上下文中,此处是修改topNamesAndDegrees的方式以获得此选项中连接最多的顶点的名称)
    def topNamesAndDegrees(degrees: VertexRDD[Int], graph: Graph[Int, Int]): Array[(String, Int)] = {
    val namesAndDegrees = degrees.innerJoin(graph.vertices) {
     (id, degree, name) => (name, degree)}
    val ord = Ordering.by[(String, Int), Int](_._2)
    namesAndDegrees.map(_._2).top(10)(ord)}
    topNamesAndDegrees(degrees, graph).foreach(println)
    
    我有兴趣了解如何改进此选项之一,如果有人看到如何改进。

    最佳答案

    您尝试的问题是idMapbisRDD。由于我们已经知道您的数据适合内存,因此您可以像以前一样简单地使用广播变量:

    val idMapRev = sc.broadcast(idMap.value.map{case (k, v) => (v, k)}.toMap)
    graph.mapVertices{case (id, _) => idMapRev.value(id)}
    

    或者,您可以从一开始就使用正确的标签:
    val countries: RDD[(VertexId, String)] = sc
      .parallelize(idMap.value.map(_.swap).toSeq)
    
    val relationships: RDD[Edge[Int]] = sc.parallelize(couples
     .map{case (x: String, y: String) => Edge(idMap.value(x), idMap.value(y), 1)}
    )
    
    val graph = Graph(countries, relationships)
    

    第二种方法具有一个重要的优势-如果图很大,则可以相对轻松地用联接替换广播变量。

    10-06 14:59