我想创建自定义的累加器,使用它们时我不会感到安全,因为我现在只能在本地对其进行测试。

我的问题是:

创建累加器时,不变性是“必须”还是“应该”?

尽管我现在找不到链接/引用,但我已经读到,累加器只允许不可变的对象。
但是,在spark的api(1.6)中,AccumulableParam和AccumulatorParam的addInPlace方法具有以下描述:
“将两个累加的值合并在一起。允许修改并返回第一个值以提高效率(避免分配对象)。”

哪一个是正确的?如果允许可变对象如何使用它们安全地创建累加器?

假设我有一个带有一个字段的可变类,并且让该字段为整数数组。当我们有一个可变的类时,如何覆盖addInPlace方法?

我应该写(Option1):

public MyClass addInPlace(MyClass c1, MyClass c2){
c1.update(c2); //Where int array of c1 is updated(let's say we add two arrays) and c1 itself is returned.
return c1;
}

还是我应该写(Option2):
public MyClass addInPlace(MyClass c1, MyClass c2){
return update2(c1,c2); //Where a new MyClass object is returned with an array(created by adding arrays of c1 and c2)
}

Option2似乎更安全,但需要额外分配。但是,以上来自API的引言说允许修改以避免分配。

另外,如果我有一个对象数组(比方说MyClass2),而不是整数数组,我应该克隆对象还是使用对象本身。
假设我要为MyClass2的PriorityQueue创建一个累加器(也许我应该为此问题输入另一个条目?)。

我将不胜感激关于累加器/火花的任何答案和高级参考/文档,尤其是在Java中。

编辑:

我感谢zero323的回答。

我希望我能找到令我困惑的链接,但是现在情况已经很清楚了。
但是,我还有两个问题。

1)我遇到了以下累加器实现,以跟踪在日志文件中看到的浏览器类型的次数。您可能会从(https://brosinski.com/post/extending-spark-accumulators/)中看到详细信息。

这是实现:
public class MapAccumulator implements AccumulatorParam<Map<String, Long>>, Serializable {

@Override
public Map<String, Long> addAccumulator(Map<String, Long> t1, Map<String, Long> t2) {
    return mergeMap(t1, t2);
}

@Override
public Map<String, Long> addInPlace(Map<String, Long> r1, Map<String, Long> r2) {
    return mergeMap(r1, r2);

}

@Override
public Map<String, Long> zero(final Map<String, Long> initialValue) {
    return new HashMap<>();
}

private Map<String, Long> mergeMap( Map<String, Long> map1, Map<String, Long> map2) {
    Map<String, Long> result = new HashMap<>(map1);
    map2.forEach((k, v) -> result.merge(k, v, (a, b) -> a + b));
    return result;
}

}

我的问题是:

为什么我们没有
map2.forEach((k, v) -> map1.merge(k, v, (a, b) -> a + b));

另外,假设我想
Map<Integer, ArrayList<MyClass>> or ArrayList<ArrayList<MyClass>>

我可以有类似(Option1)的东西吗?
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
for(int i=0;i<a2.size();i++){
    a1.get(i).addAll(a2.get(i))
}
return a1;
}

还是我应该写(Option2):
public ArrayList<ArrayList<MyClass>> addInPlace(ArrayList<ArrayList<MyClass>> a1, ArrayList<ArrayList<MyClass>> a2) {
//For now, assume that a1 and a2 have the same size
ArrayList<ArrayList<MyClass>> result= new ArrayList<ArrayList<MyClass>>();
for(int i=0;i<a1.size();i++){
    result.add(new ArrayList<MyClass>());
    result.get(i).addAll(a1.get(i));
    result.get(i).addAll(a2.get(i));
}
return result;
}

那么就蓄能器安全性而言,两种选择之间有区别吗?

2)说累加器不是线程安全的,您的意思是rdd元素可以多次更新累加器吗?还是您是指过程中使用的对象可以被另一个线程从代码中的其他位置更改?

还是仅当将累加器运送到驱动程序时才出现问题,如链接zero323 shared(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulable.scala#L43)中所写:

“如果此[[Accumulable]]是内部的。内部[[Accumulable]]将通过心跳报告给驱动程序。对于内部[[Accumulable]],R必须是线程安全的,以便可以正确地报告它们。”

对于长期加入,我深表歉意,但希望对社区也有所帮助。

最佳答案

创建自定义累加器时是否需要不变性?不它不是。您已经发现 AccumulableParam.addAccumulator AccumulableParam.addInPlace 都明确允许修改第一个参数。如果您深入研究,您会发现在 AccumulatorSuite 中使用以下参数对这种情况进行了实际测试:

new AccumulableParam[mutable.Set[A], A] {
  def addInPlace(t1: mutable.Set[A], t2: mutable.Set[A]) : mutable.Set[A] = {
    t1 ++= t2
    t1
  }
  def addAccumulator(t1: mutable.Set[A], t2: A) : mutable.Set[A] = {
    t1 += t2
    t1
  }
  def zero(t: mutable.Set[A]) : mutable.Set[A] = {
    new mutable.HashSet[A]()
  }
}

直观地讲,由于每个任务都有其自己的累加器并按顺序在分区上运行,因此,在任何情况下可变性都不会成为问题。

但是,as stated somewhere else累积对象不是线程安全的。因此,您可能应该忘记将累加器与分区级别的并行处理结合在一起。

关于java - 对于定制累加器,不变性是“必须”还是“应该”?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/36188617/

10-09 03:13