Spark数据挖掘-深入GraphX(1)

1 网络数据集

当图被用来描述系统中的组件之间的交互关系的时候,图可以被用来表示任何系统。图原理提供了通用的语言和一系列工具来表示和分析复杂的系统。简单的说:图由一系列顶点和边组成,每条边连接两个顶点表示这两个顶点之间的某种关系。下面介绍一下本文将会演示的几个有趣的图将要用到的数据:

邮件交流网络图https://snap.stanford.edu/data/email-Enron.htmlemail-Enron.txt.gzEnron公司158名雇员的电子邮件往来数据构成一个邮件交流网络有向图
食品品味网络图http://yongyeol.com/2011/12/15/paper-flavor-network.htmlingr_comp.zip通过三个食品网站获取得到的每个食品组成成分和每个成分对应的化学合成物构成一个网络
个人社交网络图http://snap.stanford.edu/data/egonets-Gplus.htmlgplus.tar.gz数据中的用户圈子组成一个个人社交网络,数据集还包括个人属性信息

2 GraphX 图形创建方式

在GraphX里面有四种创建一个属性图的方法。每种构建图的方法对数据都有一定的格式要求。下面一一分析。

2.1 利用 Object Graph 的工厂方法创建

Object Graph 是 Class Graph 的伴生对象,它定义了创建 Graph 对象的 apply 方法定义如下:

def apply[VD, ED](
  vertices: RDD[(VertexId, VD)],
  edges: RDD[Edge[ED]],
  defaultVertexAttr: VD = null
  ): Graph[VD, ED]

此方法通过传入顶点:RDD[(VertexId,VD)]和边:RDD[Edge[ED]] 就可以创建一个图。注意参数:defaultVertexAttr 是用来设置那些边中的顶点不在传入的顶点集合当中的顶点的默认属性,所以这个值的类型必须是和传入顶点的属性的类型一样。

2.2 利用 edgeListFile 创建

一个非常常见的场景是:你数据集里的数据表示的是顶点与顶点的关系即只表示边。这种情况下Graphx提供了GraphLoader.edgeListFile函数来自动生成图,函数的定义如下:

def edgeListFile(
  sc: SparkContext,
  path: String,
  canonicalOrientation: Boolean = false,
  numEdgePartitions: Int = -1)
  : Graph[Int, Int]

sc、path 这两个参数不用多说,需要注意的参数解析如下:

  • path 指向包含边的文件或文件夹 要求:文件每一行用两个按照多个空格分割的正整数表示的边,如: scrId dstId,Spark 读取的时候会忽略# 开头的行
  • canonicalOrientation 表示图是否有方向 如果值为true,那么只会加载 srcId > dstId 的边,否则全部加载
  • 加载完所有边之后,自动按照边生成顶点,默认的每个顶点的属性是1
  • numEdgePartitions 边分区个数默认是按照文件分区来划分的,也可以指定

下面看一下关键源码:

val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
  val builder = new EdgePartitionBuilder[Int, Int]
  iter.foreach { line =>
    if (!line.isEmpty && line(0) != '#') {
      val lineArray = line.split("\\s+")
      if (lineArray.length < 2) {
        throw new IllegalArgumentException("Invalid line: " + line)
      }
      val srcId = lineArray(0).toLong
      val dstId = lineArray(1).toLong
      if (canonicalOrientation && srcId > dstId) {
        builder.add(dstId, srcId, 1)
      } else {
        builder.add(srcId, dstId, 1)
      }
    }
  }
}

2.3 利用 fromEdges 创建

这个方法可以理解为edgeListFile方法内部就是调用这个方法。原理就是只根据边: RDD[Edge[ED]] 来生成图,顶点就是由所有构成边的顶点组成,顶点的默认属性用户可以指定,定义如下:

def fromEdges[VD: ClassTag, ED: ClassTag](
    edges: RDD[Edge[ED]],
    defaultValue: VD): Graph[VD, ED]

2.4 利用 fromEdgeTuples 创建

这个方法也可以理解为edgeListFile方法内部就是调用这个方法。原理就是只根据边: RDD[(VertexId, VertexId)] 来生成图,连边的属性都不知道,默认边的属性当然可以设置,顶点就是由所有构成边的顶点组成,顶点的默认属性用户可以指定,定义如下:

def fromEdgeTuples[VD](
  rawEdges: RDD[(VertexId, VertexId)],
  defaultValue: VD,
  uniqueEdges: Option[PartitionStrategy] = None)
  : Graph[VD, Int]

其实后面三种方式都是不明确指定顶点,而是通过边来推导出顶点,这非常适合无属性图,比较常用的是第一种和第二种方式。当然也可以自己实现第三种方式的文件读取方式,比如文件中不止两列,还有属性列等等,非常简单。

3 GraphX 图形创建实战

3.1 创建一个双向图

先拿上面数据列表中的第一份数据,数据解压之后的文件名为:Email-Enron.txt,前面十条示例数据如下:

# Directed graph (each unordered pair of nodes is saved once): Email-Enron.txt
# Enron email network (edge indicated that email was exchanged, undirected edges)
# Nodes: 36692 Edges: 367662
# FromNodeId	ToNodeId
0	1
1	0
1	2
1	3

可以发现这个数据集合非常适合上面edgeListFile方法创建图形,代码如下:

val emailGraph = GraphLoader.edgeListFile(sc, projectDir + "Email-Enron.txt")

查看一下图中前面5个顶点和边

emailGraph.vertices.take(5).foreach(println)
(19021,1)
(28730,1)
(23776,1)
(34207,1)
(31037,1)
emailGraph.edges.take(5).foreach(println)
Edge(0,1,1)
Edge(1,0,1)
Edge(1,2,1)
Edge(1,3,1)
Edge(1,4,1)

查看一下是否是双向图(任何两个点只要有连接必须是来回指向),这里只是查看顶点ID为19021的点:

emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect().foreach(println)
696
4232
6811
8315
26007
emailGraph.edges.filter(_.dstId == 19021).map(_.srcId).collect().foreach(println)
696
4232
6811
8315
26007

3.2 创建一个二分图

什么是二分图?简单来说:二分图指的是图的顶点分为两个集合,其中任意集合内部顶点不可能有边关联,关联的边顶点一定分布在两个不同的集合之中。详细原理见Wiki百科
本文第二个数据集食物成分和化合物的关系图就是二分图。将下载的数据解压,先来看一下压缩包中每个原始文件前十条数据:

  • 文件1:ingr_info.tsv 从文件名可以知道它是按照制表符分割的文件 表示的是食物原料的信息
下面三列分别表示:原料ID	原料名字	分类
# id	ingredient name	category
0	magnolia_tripetala	flower
1	calyptranthes_parriculata	plant
2	chamaecyparis_pisifera_oil	plant derivative
3	mackerel	fish/seafood
4	mimusops_elengi_flower	flower
5	hyssop	herb
6	buchu	plant
7	black_pepper	spice
8	eryngium_poterium_oil	plant derivative
9	peanut_butter	plant derivative
  • 文件2:comp_info.tsv 这个表示化合物的基础信息
下面三列分别表示:化合物ID	化合物名字	CAS编号
# id	Compound name	CAS number
0	jasmone	488-10-8
1	5-methylhexanoic_acid	628-46-6
2	l-glutamine	56-85-9
3	1-methyl-3-methoxy-4-isopropylbenzene	1076-56-8
4	methyl-3-phenylpropionate	103-25-3
5	3-mercapto-2-methylpentan-1-ol_(racemic)	227456-27-1
6	ethyl-3-hydroxybutyrate	5405-41-4
7	cyclohexyl_butyrate	1551-44-6
8	methyl_dihydrojasmonate	24851-98-7
9	methyl_2-methylthiobutyrate	42075-45-6
  • 文件3:ingr_comp.tsv 这个记录的是 ingredient 和 compound 对应关系
# ingredient id	compound id
1392	906
1259	861
1079	673
22	906
103	906
1005	906
1005	278
1005	171

有了数据之后,如果你盲目的使用第三个文件直接按照上面的第一种方式建图的话,那么就会大错特错。因为第一列的ID和第二列的ID不是表示同一个事物,但是它们有交叉的数值。一个简单的办法就是第二列的值转化为第一列最大值+1之后再加上自身的数值,这样保证两个集合的ID没有交叉。请看下面的代码:

package clebeg.spark.graph

import org.apache.spark.graphx.{EdgeTriplet, VertexId, Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


//定义下面的类将 ingredient 和 compount 统一表示 注意父类一定要可以序列化
class FoodNode(val name: String) extends Serializable
case class Ingredient(override val name: String, val cat: String) extends FoodNode(name)
case class Compound(override val name: String, val cas: String) extends FoodNode(name)
/**
  * Created by clebegxie on 2015/11/25.
  */
object Graph1Food {
  val projectDir = "your_data_dir/"
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val ingredients: RDD[(VertexId, FoodNode)] = sc.textFile(projectDir + "ingr_info.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        (array(0).toLong, Ingredient(array(1), array(2)))
    }
    //获取得到最大的 ingredient 的ID 并且加1
    val maxIngrId = ingredients.keys.max() + 1
    val compounds: RDD[(VertexId, FoodNode)] = sc.textFile(projectDir + "comp_info.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        (maxIngrId + array(0).toLong, Compound(array(1), array(2)))
    }
    //根据文件 ingr_comp.csv 生成边,注意第二列的所有顶点都要加上 maxIngrId
    val links = sc.textFile(projectDir + "ingr_comp.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        Edge(array(0).toLong, maxIngrId + array(1).toLong, 1)
    }
    //将两个顶点合并
    val vertices = ingredients ++ compounds
    val foodNetWork = Graph(vertices, links)
    //foodNetWork.vertices.take(10).foreach(println)
    //访问一下这个网络前面5条triplet的对应关系
    foodNetWork.triplets.take(5).foreach(showTriplet _ andThen println _)
  }

  def showTriplet(t: EdgeTriplet[FoodNode, Int]): String =
    "The ingredient " ++ t.srcAttr.name ++ " contains " ++ t.dstAttr.name
}

运行结果为:

The ingredient calyptranthes_parriculata contains citral_(neral)
The ingredient chamaecyparis_pisifera_oil contains undecanoic_acid
The ingredient hyssop contains myrtenyl_acetate
The ingredient hyssop contains 4-(2,6,6-trimethyl-cyclohexa-1,3-dienyl)but-2-en-4-one
The ingredient buchu contains menthol

3.3 创建一个人与人之间相似性权重图

数据集是使用上面介绍的Google+提供的个人关系数据,解压之后有792个文件,每一个文件名去掉后缀代表的是网络ID,每个网络ID有6个文件,所以这里有132个个人关系网络。下面以ID为100129275726588145876的网络说明一下每个文件的含义:

  • .edges 记录的是边,即ID对应的用户之间有关联,示例数据为:
116374117927631468606 101765416973555767821
112188647432305746617 107727150903234299458
116719211656774388392 100432456209427807893
117421021456205115327 101096322838605097368
116407635616074189669 113556266482860931616
105706178492556563330 111169963967137030210
107527001343993112621 110877363259509543172
105513412023818293063 115710735637044108808
108736646334864181044 112393248315358692010
108683283643126638695 107111579950257773726
  • .feat 记录的是每个用户ID对应的特征,每个维度上面都是取值为 0 1,示例数据为:
#注意这里只是一行数据
114985346359714431656 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  • .featnames 记录的是上面feat每个维度对应的含义(注意:上面之所以每个维度取值都是 0 1, 是因为这里的特征都是分类变量,并且做了 1 of n 编码),示例数据为:
//从下面的gender可以看出,做了 1 of n 编码
0 gender:1
1 gender:2
2 gender:3
3 institution:
4 institution:AMC Theatres
5 institution:AOL
6 institution:AT&T
7 institution:Aardvark
8 institution:Accenture
9 institution:Adobe Systems

下面之间给出建图代码,代码意图都有注释:

val projectDir = "your_data_dir/"
val id = "100129275726588145876" //只建立这个ID对应的社交关系图
type Feature = breeze.linalg.SparseVector[Int]
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]")
  val sc = new SparkContext(conf)
  //通过 .feat 文件读取每个顶点的属性向量
  val featureMap = Source.fromFile(projectDir + id + ".feat").getLines().
  map {
    line =>
      val row = line.split(" ")
      //注意:ID 不能之间当作 Long 型的时候 常常用 hashcode 代替
      val key = abs(row.head.hashCode.toLong)
      val feat = SparseVector(row.tail.map(_.toInt))
      (key, feat)
  }.toMap

  //通过 .edges 文件得到两个用户之间的关系 并且计算他们相同特征的个数
  val edges = sc.textFile(projectDir + id + ".edges").map {
    line =>
      val row = line.split(" ")
      val srcId = abs(row(0).hashCode.toLong)
      val dstId = abs(row(1).hashCode.toLong)
      val srcFeat = featureMap(srcId)
      val dstFeat = featureMap(dstId)
      val numCommonFeats: Int = srcFeat dot dstFeat
      Edge(srcId, dstId, numCommonFeats)
  }

  //利用 fromEdges 建立图
  val egoNetwork = Graph.fromEdges(edges, 1)

  //查看一下具有3个相同特征的用户对
  print(egoNetwork.edges.filter(_.attr == 3).count())
}

这里需要注意下面两个地方:

  • Graphx对应的顶点必须为 Long 型,对于不符合的类型,通常取 HashCode
  • fromEdges 建图只需要已知边即可

个人微信公众号

欢迎关注本人微信公众号,会定时发送关于大数据、机器学习、Java、Linux 等技术的学习文章,而且是一个系列一个系列的发布,无任何广告,纯属个人兴趣。
Spark数据挖掘-深入GraphX(1)-LMLPHP

08-30 18:20