我有一个异步任务的编译时directed acyclic graph。 DAG显示了任务之间的依赖关系:通过分析,可以了解哪些任务可以并行运行(在单独的线程中)以及哪些任务需要等待其他任务完成才能开始(依赖关系)。
我想使用boost::future
和.then(...)
,when_all(...)
延续帮助器函数从DAG生成回调链。生成的结果将是一个函数,该函数在被调用时将启动回调链并执行DAG所描述的任务,并以并行方式运行尽可能多的任务。
但是,我遇到了麻烦,无法找到一种适用于所有情况的通用算法。
我画了几张图,使问题更容易理解。这是一个图例,它将向您显示图纸中的符号的含义:
让我们从一个简单的线性DAG开始:
此依赖关系图包含三个任务(A
,B
和C
)。 C
取决于B
。 B
取决于A
。这里没有并行性的可能性-生成算法将构建类似于以下内容的东西:
boost::future<void> A, B, C, end;
A.then([]
{
B.then([]
{
C.get();
end.get();
});
});
(请注意,所有代码示例都不是100%有效的-我忽略了移动语义,转发和lambda捕获。)
解决线性DAG的方法有很多:通过从头开始或从头开始,构建正确的回调链很简单。
引入forks and joins后,事情开始变得更加复杂。
这是一个带有fork/join的DAG:
很难想到与该DAG相匹配的回调链。如果我尝试从头开始倒退,那么我的理由如下:
end
取决于B
和D
。 (加入)D
取决于C
。 B
和C
取决于A
。 ( fork )可能的链看起来像这样:
boost::future<void> A, B, C, D, end;
A.then([]
{
boost::when_all(B, C.then([]
{
D.get();
}))
.then([]
{
end.get();
});
});
我发现手工编写此链条很困难,而且我对它的正确性也感到怀疑。我想不出一种通用的方法来实现可以生成此算法的算法-由于
when_all
需要将其参数移入其中,因此也存在其他困难。让我们来看最后一个甚至更复杂的示例:
在这里,我们要尽可能地利用并行性。考虑任务
E
:E
可以与[B, C, D]
中的任何一个并行运行。这是一个可能的回调链:
boost::future<void> A, B, C, D, E, F, end;
A.then([]
{
boost::when_all(boost::when_all(B, C).then([]
{
D.get();
}),
E)
.then([]
{
F.then([]
{
end.get();
});
});
});
我试图通过几种方式提出一种通用算法:
.then(...)
延续来构建链。这不适用于联接,因为目标联接任务将被重复多次。 when_all(...)
延续生成链。对于派生,此操作将失败,因为创建派生的节点将重复多次。 显然,“广度优先遍历”方法在这里效果不佳。从我手写的代码示例来看,该算法似乎需要了解派生和联接,并且需要能够正确混合
.then(...)
和when_all(...)
延续。这是我最后的问题:
future
的回调链,其中每个任务在回调链中仅出现一次? 编辑1:
Here's an additional approach我正在尝试探索。
这个想法是从DAG生成
([dependencies...] -> [dependents...])
映射数据结构,并从该映射生成回调链。如果为
len(dependencies...) > 1
,则value
是一个联接节点。如果是
len(dependents...) > 1
,则key
是一个fork节点。映射中的所有键值对都可以表示为
when_all(keys...).then(values...)
延续。困难的部分是弄清楚节点“扩展”(考虑类似于解析器的东西)的正确顺序,以及如何将派生/连接延续连接在一起。
考虑以下由图像4生成的 map 。
depenendencies | dependents
----------------|-------------
[F] : [end]
[D, E] : [F]
[B, C] : [D]
[A] : [E, C, B]
[begin] : [A]
通过应用某种类似于解析器的缩减/传递,我们可以获得“干净”的回调链:
// First pass:
// Convert everything to `when_all(...).then(...)` notation
when_all(F).then(end)
when_all(D, E).then(F)
when_all(B, C).then(D)
when_all(A).then(E, C, B)
when_all(begin).then(A)
// Second pass:
// Solve linear (trivial) transformations
when_all(D, E).then(
when_all(F).then(end)
)
when_all(B, C).then(D)
when_all(
when_all(begin).then(A)
).then(E, C, B)
// Third pass:
// Solve fork/join transformations
when_all(
when_all(begin).then(A)
).then(
when_all(
E,
when_all(B, C).then(D)
).then(
when_all(F).then(end)
)
)
第三遍是最重要的一遍,但实际上却很难为其设计算法。
注意如何在
[B, C]
列表中找到[E, C, B]
,以及如何在[D, E]
依赖项列表中将D
解释为when_all(B, C).then(D)
的结果,并与E
链接在when_all(E, when_all(B, C).then(D))
中。也许整个问题可以简化为:
给定一个由
[dependencies...] -> [dependents...]
键值对组成的映射,如何实现将这些对转换为when_all(...)
/.then(...)
延续链的算法? 编辑2:
这是我为上述方法想到的pseudocode。它对于我尝试过的DAG似乎可行,但是我需要花更多的时间在它上面,并在精神上用其他更棘手的DAG配置对其进行测试。
最佳答案
如果可能出现多余的依赖关系,请先删除它们(例如https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph)。
然后执行以下图形转换(在合并的节点中构建子表达式),直到进入单个节点为止(类似于计算电阻器网络的方式):*
:额外的传入或传出依赖性,取决于放置位置(...)
:在单个节点中的表达式
Java代码,包括用于更复杂的示例的设置:
public class DirectedGraph {
/** Set of all nodes in the graph */
static Set<Node> allNodes = new LinkedHashSet<>();
static class Node {
/** Set of all preceeding nodes */
Set<Node> prev = new LinkedHashSet<>();
/** Set of all following nodes */
Set<Node> next = new LinkedHashSet<>();
String value;
Node(String value) {
this.value = value;
allNodes.add(this);
}
void addPrev(Node other) {
prev.add(other);
other.next.add(this);
}
/** Returns one of the next nodes */
Node anyNext() {
return next.iterator().next();
}
/** Merges this node with other, then removes other */
void merge(Node other) {
prev.addAll(other.prev);
next.addAll(other.next);
for (Node on: other.next) {
on.prev.remove(other);
on.prev.add(this);
}
for (Node op: other.prev) {
op.next.remove(other);
op.next.add(this);
}
prev.remove(this);
next.remove(this);
allNodes.remove(other);
}
public String toString() {
return value;
}
}
/**
* Merges sequential or parallel nodes following the given node.
* Returns true if any node was merged.
*/
public static boolean processNode(Node node) {
// Check if we are the start of a sequence. Merge if so.
if (node.next.size() == 1 && node.anyNext().prev.size() == 1) {
Node then = node.anyNext();
node.value += " then " + then.value;
node.merge(then);
return true;
}
// See if any of the next nodes has a parallel node with
// the same one level indirect target.
for (Node next : node.next) {
// Nodes must have only one in and out connection to be merged.
if (next.prev.size() == 1 && next.next.size() == 1) {
// Collect all parallel nodes with only one in and out connection
// and the same target; the same source is implied by iterating over
// node.next again.
Node target = next.anyNext().next();
Set<Node> parallel = new LinkedHashSet<Node>();
for (Node other: node.next) {
if (other != next && other.prev.size() == 1
&& other.next.size() == 1 && other.anyNext() == target) {
parallel.add(other);
}
}
// If we have found any "parallel" nodes, merge them
if (parallel.size() > 0) {
StringBuilder sb = new StringBuilder("allNodes(");
sb.append(next.value);
for (Node other: parallel) {
sb.append(", ").append(other.value);
next.merge(other);
}
sb.append(")");
next.value = sb.toString();
return true;
}
}
}
return false;
}
public static void main(String[] args) {
Node a = new Node("A");
Node b = new Node("B");
Node c = new Node("C");
Node d = new Node("D");
Node e = new Node("E");
Node f = new Node("F");
f.addPrev(d);
f.addPrev(e);
e.addPrev(a);
d.addPrev(b);
d.addPrev(c);
b.addPrev(a);
c.addPrev(a);
boolean anyChange;
do {
anyChange = false;
for (Node node: allNodes) {
if (processNode(node)) {
anyChange = true;
// We need to leave the inner loop here because changes
// invalidate the for iteration.
break;
}
}
// We are done if we can't find any node to merge.
} while (anyChange);
System.out.println(allNodes.toString());
}
}
输出:
A then all(E, all(B, C) then D) then F
关于c++ - 从编译时依赖图(DAG)构建异步 `future`回调链,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/35778864/