问题描述
我是Python的新手,并且坚持使用关系数据集构建层次结构.
如果有人对如何进行此操作有想法,那将是极大的帮助.
I am new to Python and stuck with building a hierarchy out of a relational dataset.
It would be of immense help if someone has an idea on how to proceed with this.
我有一个与之相关的数据集
I have a relational data-set with data like
_currentnode, childnode_
root, child1
child1, leaf2
child1, child3
child1, leaf4
child3, leaf5
child3, leaf6
等等.我正在寻找一些python或pyspark代码以
建立如下的层次结构数据框
so-on. I am looking for some python or pyspark code to
build a hierarchy dataframe like below
_level1, level2, level3, level4_
root, child1, leaf2, null
root, child1, child3, leaf5
root, child1, child3, leaf6
root, child1, leaf4, null
数据是字母数字,是一个巨大的数据集[〜5000万条记录].
同样,层次结构的根是已知的,可以在代码中进行硬连线.
因此,在上面的示例中,层次结构的根是根".
The data is alpha-numerics and is a huge dataset[~50mil records].
Also, the root of the hierarchy is known and can be hardwired in the code.
So in the example, above, the root of the hierarchy is 'root'.
推荐答案
带有Pyspark的最短路径
输入数据可以解释为具有 currentnode
和 childnode
之间的连接的图形.然后问题是根节点和所有叶节点之间的最短路径是什么,并称为单源最短路径.
Shortest Path with Pyspark
The input data can be interpreted as a graph with the connections between currentnode
and childnode
. Then the question is what is the shortest path between the root node and all leaf nodes and is called single source shortest path.
Spark具有 Graphx 来处理并行计算图.不幸的是,GraphX不提供Python API(可在此处找到更多详细信息).具有Python支持的图形库是 GraphFrames .GraphFrames使用GraphX的一部分.
Spark has Graphx to handle parallel computations of graphs. Unfortunately, GraphX does not provide a Python API (more details can be found here). A graph library with Python support is GraphFrames. GraphFrames uses parts of GraphX.
GraphX和GraphFrames都为sssp提供了解决方案.再次遗憾的是,这两种实现都只返回最短路径的长度,而不返回路径本身( GraphX 和 GraphFrames ).但是此答案提供了GraphX和Scala算法的实现,该算法也返回路径.这三种解决方案都使用 Pregel .
Both GraphX and GraphFrames provide an solution for sssp. Unfortunately again, both implementations return only the length of the shortest paths, not the paths themselves (GraphX and GraphFrames). But this answer provides an implementation of the algorithm for GraphX and Scala that also returns the paths. All three solutions use Pregel.
将上述答案翻译为GraphFrames/Python:
Translating the aforementioned answer to GraphFrames/Python:
为所有节点提供唯一的ID,并更改列名称,以使其适合描述的名称此处
Provide unique IDs for all nodes and change the column names so that they fit to the names described here
import pyspark.sql.functions as F
df = ...
vertices = df.select("currentnode").withColumnRenamed("currentnode", "node").union(df.select("childnode")).distinct().withColumn("id", F.monotonically_increasing_id()).cache()
edges = df.join(vertices, df.currentnode == vertices.node).drop(F.col("node")).withColumnRenamed("id", "src")\
.join(vertices, df.childnode== vertices.node).drop(F.col("node")).withColumnRenamed("id", "dst").cache()
Nodes Edges
+------+------------+ +-----------+---------+------------+------------+
| node| id| |currentnode|childnode| src| dst|
+------+------------+ +-----------+---------+------------+------------+
| leaf2| 17179869184| | child1| leaf4| 25769803776|249108103168|
|child1| 25769803776| | child1| child3| 25769803776| 68719476736|
|child3| 68719476736| | child1| leaf2| 25769803776| 17179869184|
| leaf6|103079215104| | child3| leaf6| 68719476736|103079215104|
| root|171798691840| | child3| leaf5| 68719476736|214748364800|
| leaf5|214748364800| | root| child1|171798691840| 25769803776|
| leaf4|249108103168| +-----------+---------+------------+------------+
+------+------------+
2.创建GraphFrame
from graphframes import GraphFrame
graph = GraphFrame(vertices, edges)
3.创建将构成Pregel算法单个部分的UDF
消息类型:来自pyspark.sql.types import的3. Create UDFs that will form the single parts of the Pregel algorithm
The message type:from pyspark.sql.types import *
vertColSchema = StructType()\
.add("dist", DoubleType())\
.add("node", StringType())\
.add("path", ArrayType(StringType(), True))
顶点程序:
def vertexProgram(vd, msg):
if msg == None or vd.__getitem__(0) < msg.__getitem__(0):
return (vd.__getitem__(0), vd.__getitem__(1), vd.__getitem__(2))
else:
return (msg.__getitem__(0), vd.__getitem__(1), msg.__getitem__(2))
vertexProgramUdf = F.udf(vertexProgram, vertColSchema)
外发消息:
def sendMsgToDst(src, dst):
srcDist = src.__getitem__(0)
dstDist = dst.__getitem__(0)
if srcDist < (dstDist - 1):
return (srcDist + 1, src.__getitem__(1), src.__getitem__(2) + [dst.__getitem__(1)])
else:
return None
sendMsgToDstUdf = F.udf(sendMsgToDst, vertColSchema)
消息聚合:
def aggMsgs(agg):
shortest_dist = sorted(agg, key=lambda tup: tup[1])[0]
return (shortest_dist.__getitem__(0), shortest_dist.__getitem__(1), shortest_dist.__getitem__(2))
aggMsgsUdf = F.udf(aggMsgs, vertColSchema)
4.合并零件
从graphframes.lib中的4. Combine the parts
from graphframes.lib import Pregel
result = graph.pregel.withVertexColumn(colName = "vertCol", \
initialExpr = F.when(F.col("node")==(F.lit("root")), F.struct(F.lit(0.0), F.col("node"), F.array(F.col("node")))) \
.otherwise(F.struct(F.lit(float("inf")), F.col("node"), F.array(F.lit("")))).cast(vertColSchema), \
updateAfterAggMsgsExpr = vertexProgramUdf(F.col("vertCol"), Pregel.msg())) \
.sendMsgToDst(sendMsgToDstUdf(F.col("src.vertCol"), Pregel.dst("vertCol"))) \
.aggMsgs(aggMsgsUdf(F.collect_list(Pregel.msg()))) \
.setMaxIter(10) \
.setCheckpointInterval(2) \
.run()
result.select("vertCol.path").show(truncate=False)
备注:
-
maxIter
应该设置为至少与最长路径一样大的值.如果该值较高,结果将保持不变,但计算时间会变长.如果值太小,结果中将缺少较长的路径.当前版本的GraphFrames(0.8.0)不支持在不再发送新消息时停止循环. -
checkpointInterval
的值应小于maxIter
.实际值取决于数据和可用的硬件.当发生OutOfMemory异常或Spark会话挂起一段时间时,该值可能会减小.
maxIter
should be set to a value at least as large as the longest path. If the value is higher, the result will stay unchanged, but the computation time becomes longer. If the value is too small, the longer paths will be missing in the result. The current version of GraphFrames (0.8.0) does not support stopping the loop when no more new messages are sent.checkpointInterval
should be set to a value smaller thanmaxIter
. The actual value depends on the data and the available hardware. When OutOfMemory exception occur or the Spark session hangs for some time, the value could be reduced.
最终结果是包含内容的常规数据框
The final result is a regular dataframe with the content
+-----------------------------+
|path |
+-----------------------------+
|[root, child1] |
|[root, child1, leaf4] |
|[root, child1, child3] |
|[root] |
|[root, child1, child3, leaf6]|
|[root, child1, child3, leaf5]|
|[root, child1, leaf2] |
+-----------------------------+
如有必要,可以在此处过滤掉非叶子节点.
If necessary the non-leaf nodes could be filtered out here.
这篇关于使用Pyspark从关系数据集构建层次结构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!