本文介绍了在社区检测算法中选择哪种变量来递增节点标签的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一种社区检测算法,该算法使用将标签传播到节点的概念.我在选择Label_counter变量的真实类型时遇到问题.

我们有一个名为LPA(label propagation algorithm)的算法,该算法通过迭代将标签传播到节点.将标签视为节点属性.每个节点的初始标签是节点ID,在迭代中,节点根据其邻居中最频繁的标签来更新其新标签.我正在研究的算法类似于LPA.首先,每个节点的初始标签等于0,然后节点获得新标签.当节点更新并获得新标签时,根据某些条件,Label_counter应该加1以将该值用作其他节点的标签.例如label = 1或label = 2,依此类推.例如,我们有zachary空手道俱乐部数据集,它具有34个节点,并且该数据集具有2个社区.初始状态是这样的:

 (1,0)
 (2,0)
   .
   .
   .
 (34,0)

第一个数字是节点ID,第二个数字是标签.当节点获得新标签时,Label_counter递增,其他节点在下一次迭代中获得新标签,并再次Label_counter递增.

 (1,1)
 (2,1)
 (3,1)
   .
   .
   .
 (33,3)
 (34,3)

具有相同标签的

节点属于同一社区.

我的问题是:因为RDD中的节点和变量分布在整个机器上(每台机器都有一个变量的副本),并且执行程序之间没有连接,所以如果执行程序更新Label_counter,则其他执行程序将不会收到新值Label_counter的通知.也许节点会得到错误的标签,在这种情况下使用Accumulator作为标签计数器是否正确,因为累加器是机器之间的共享变量,或者还有其他方法可以解决此问题?

解决方案

在spark中,计算类似索引的值总是很复杂,因为它们依赖于并非所有分区中的事物.我可以提出以下想法.

  1. 计算每个分区满足条件的次数
  2. 计算每个分区的累积增量,以便我们知道每个分区的初始增量.
  3. 根据该初始增量递增分区的值

这是代码的样子.让我从设置几件事开始.

 // Let's define some condition
def condition(node : Long) = node % 10 == 1

// step 0, generate the data
val rdd = spark.range(34)
    .select('id+1).repartition(10).rdd
    .map(r => (r.getAs[Long](0), 0))
    .sortBy(_._1).cache()
rdd.collect
Array[(Long, Int)] = Array((1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0),
 (9,0), (10,0), (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0),
 (19,0), (20,0), (21,0), (22,0), (23,0), (24,0), (25,0), (26,0), (27,0), (28,0),
 (29,0), (30,0), (31,0), (32,0), (33,0), (34,0))
 

然后是解决方案的核心:

 // step 1 and 2
val partIncrInit = rdd
    // to each partition, we associate the number of times we need to increment
    .mapPartitionsWithIndex{ case (i,p) =>
        Iterator(i -> p.map(_._1).count(condition))
    }
    .collect.sorted // sort by partition index
    .map(_._2) // we don't need the index anymore
    .scanLeft(0)(_+_) // cumulated sum

// step 3, we increment each partition based on this initial increment.
val result = rdd
    .mapPartitionsWithIndex{ case (i, p) =>
        var incr = 0
        p.map{ case (node, value) =>
            if(condition(node))
                incr+=1
            (node, partIncrInit(i) + value + incr)
        }
    }
result.collect

Array[(Long, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1),
 (9,1), (10,1), (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2),
 (19,2), (20,2), (21,3), (22,3), (23,3), (24,3), (25,3), (26,3), (27,3), (28,3),
 (29,3), (30,3), (31,4), (32,4), (33,4), (34,4))
 

i am working on a community detection algorithm that uses the concept of propagating label to nodes. i have problem in selecting the true type for the Label_counter variable.

we have an algorithm with name LPA(label propagation algorithm) which propagates labels to nodes through iterations. think labels as node property. the initial label for each node is the node id, and in iterations nodes update their new label based on the most frequent label among its neighbors. the algorithm i am working on is something like LPA. at first every node has initial label equal to 0 and then nodes get new labels. as nodes update and get new labels, based on some conditions the Label_counter should be incremented by one to use this value as label for other nodes . for example label=1 or label = 2 and so on. for example we have zachary karate club dataset that it has 34 nodes and the dataset has 2 communities.the initial state is like this:

 (1,0)
 (2,0)
   .
   .
   .
 (34,0)

first number is node Id and second one is label.as nodes get new label, the Label_counter increments and other nodes in next iterations get new label and again Label_counter increments.

 (1,1)
 (2,1)
 (3,1)
   .
   .
   .
 (33,3)
 (34,3)

nodes with same label, belong to same community.

the problem that i have is:because nodes in RDD and variables are distributed across the machines(each machine has a copy of variables) and executors dont have connection with each other, if an executor updates the Label_counter, other executors wont be informed of new value of Label_counter and maybe nodes will get wrong labels, IS it true to use Accumulator as label counter in this case, because Accumulators are shared variables across machines, or there is other ways for handling this problem???

解决方案

In spark it is always complicated to compute index like values because they depend on things that are not in all the partitions. I can propose the following idea.

  1. Compute the number of time the condition is met per partition
  2. Compute the cumulated increment per partition so that we know the initial increment of each partition.
  3. Increment the values of the partition based on that initial increment

Here is what the code could look like this. Let me start by setting up a few things.

// Let's define some condition
def condition(node : Long) = node % 10 == 1

// step 0, generate the data
val rdd = spark.range(34)
    .select('id+1).repartition(10).rdd
    .map(r => (r.getAs[Long](0), 0))
    .sortBy(_._1).cache()
rdd.collect
Array[(Long, Int)] = Array((1,0), (2,0), (3,0), (4,0), (5,0), (6,0), (7,0), (8,0),
 (9,0), (10,0), (11,0), (12,0), (13,0), (14,0), (15,0), (16,0), (17,0), (18,0),
 (19,0), (20,0), (21,0), (22,0), (23,0), (24,0), (25,0), (26,0), (27,0), (28,0),
 (29,0), (30,0), (31,0), (32,0), (33,0), (34,0))

Then the core of the solution:

// step 1 and 2
val partIncrInit = rdd
    // to each partition, we associate the number of times we need to increment
    .mapPartitionsWithIndex{ case (i,p) =>
        Iterator(i -> p.map(_._1).count(condition))
    }
    .collect.sorted // sort by partition index
    .map(_._2) // we don't need the index anymore
    .scanLeft(0)(_+_) // cumulated sum

// step 3, we increment each partition based on this initial increment.
val result = rdd
    .mapPartitionsWithIndex{ case (i, p) =>
        var incr = 0
        p.map{ case (node, value) =>
            if(condition(node))
                incr+=1
            (node, partIncrInit(i) + value + incr)
        }
    }
result.collect

Array[(Long, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1),
 (9,1), (10,1), (11,2), (12,2), (13,2), (14,2), (15,2), (16,2), (17,2), (18,2),
 (19,2), (20,2), (21,3), (22,3), (23,3), (24,3), (25,3), (26,3), (27,3), (28,3),
 (29,3), (30,3), (31,4), (32,4), (33,4), (34,4))

这篇关于在社区检测算法中选择哪种变量来递增节点标签的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-31 06:09