我正在尝试在giraph中实现Spinner图分区算法。
在第一步中,我的程序将边缘添加到给定的输入图中,使其变为无向图,并且每个顶点都选择一个随机分区。 (此分区整数存储在VertexValue中)在初始化步骤结束时,每个顶点都会向所有出站边缘发送一条消息,该消息具有顶点ID(LongWritable)和该顶点选择的分区。

这一切都很好。现在,在我遇到麻烦的步骤中,每个顶点都会迭代接收到的消息,并将接收到的分区保存在相应边的EdgeValue中。
(VertexValueV中的Vertex<I,V,E>EdgeValueE中的Edge<I,E>)

这是我的代码的重要部分:

包装类:

public class EdgeValue implements Writable {
private int weight;
private int partition;
// Getters and setters for weight and partition
    public EdgeValue() {
    this.weight = -2;
    this.partition = -1;
}
// Constructors taking 1 and 2 ints and setting weight/partition to the given value

@Override
public void readFields(DataInput in) throws IOException {
    this.weight = in.readInt();
    this.partition = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
    out.writeInt(this.weight);
    out.writeInt(this.partition);
}
}

public class SpinnerMessage implements Writable, Configurable {
private long senderId;
private int updatePartition;
public SpinnerMessage() {
    this.senderId = -1;
    this.updatePartition = -1;
}
// Constructors taking int and/or LongWritable and setting the fields
// Getters and setters for senderId and updatePartition

@Override
public void readFields(DataInput in) throws IOException {
    this.senderId = in.readLong();
    this.updatePartition = in.readInt();
}

@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(this.senderId);
    out.writeInt(this.updatePartition);
}
}

上一步中的compute方法(ran是Random对象):
public void compute(Vertex<LongWritable, VertexValue, EdgeValue> vertex, Iterable<LongWritable> messages) {
    int initialPartition = this.ran.nextInt(GlobalInformation.numberOfPartitions);
    vertex.getValue().setPartition(initialPartition);
    sendMessageToAllEdges(vertex, new SpinnerMessage(vertex.getId(),initialPartition));
}

发生错误的步骤中的compute方法:
public void compute(Vertex<LongWritable, VertexValue, EdgeValue> vertex,Iterable<SpinnerMessage> messages) throws IOException {
for (SpinnerMessage m : messages) {
    vertex.getEdgeValue(new LongWritable(m.getSenderWritable().get())).setPartition(m.getUpdatePartition());
}
// ... some other code, e.g. initializing the amountOfNeighbors array.
// Here I get an ArrayIndexOutOfBoundsException since the partition is -1:
for (Edge<LongWritable, EdgeValue> edge : vertex.getEdges()) {
    EdgeValue curValue = edge.getValue();
    amountOfNeighbors[curValue.getPartition()] += curValue.getWeight();
}

但是,当我用例如
for(Edge<LongWritable, EdgeValue> e : vertex.getEdges())

或通过
vertex.getEdgeValue(someVertex)

然后返回的EdgeValue具有权重-2和分区-1(标准构造函数的默认值)

我的想法可能会导致该错误:
  • getEdgeValue(new LongWritable(someLong))可能不起作用,因为它是与另一个具有相同值的new LongWritable(someLong)不同的对象。但是,我已经在giraph代码中看到了此用法,因此这似乎没有问题,只有LongWritable内部存储的长字符串似乎很重要。
  • (最有可能导致)Hadoop序列化和反序列化以某种方式更改了我的EdgeValue对象。由于Hadoop适用于非常大的图形,因此它们可能不适合RAM。为此,VertexValueEdgeValue必须实现Writable。但是,在在线检查了一些giraph代码之后,我以对我来说似乎正确的方式实现了read()write()(以相同的顺序编写和读取重要字段)。 (由于第二次调用返回的EdgeValue具有标准构造函数的字段值,因此我认为这与问题有某种联系)

    我也阅读了一些文档:



    但是,这不适用于我,因为我只有一个EdgeValue变量,对吗?

    预先感谢所有花时间帮助我的人。 (我正在使用hadoop 1.2.1和giraph 1.2.0)

    最佳答案

    在查看了更多giraph代码示例之后,我找到了解决方案:Vertex.getEdgeValue()方法基本上创建了EdgeValue的副本
    顶点如果更改了它返回的对象,则不会写入这些更改
    回到磁盘。要在EdgeValueVertexValue中保存信息,您必须使用setVertexValue()setEdgeValue()

  • 10-01 07:55