问题描述
假设我有两个可能无限的流:
Suppose I have two possibly infinite streams:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
我想要合并流,然后用慢速异步操作映射合并流(例如在Bacon中使用 fromPromise
和 flatMapConcat
)。
I want to merge the streams and then map merged stream with slowish asynchronous operation (e.g. in Bacon with fromPromise
and flatMapConcat
).
我可以将它们与合并
组合:
me = a12b3.c45d6.7e...
然后映射
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
正如您所见 greedier s2
从长远来看,溪流会获得优势。 这是不受欢迎的行为。
As you see greedier s2
streams gets advantage in the long run. This is undesired behaviour.
合并行为不是好吧,因为我想要某种背压来进行更多交错,公平,循环合并。 期望行为的几个例子:
The merge behaviour is not ok, as I want to have some kind of backpressure to have more interleaved, "fair", "round-robin" merge. Few examples of desired behaviour:
s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
单向认为这是 s1
和 s2
向工作人员发送任务,当时只能处理一项任务。使用 merge
和 flatMapConcat
我会得到一个贪婪的任务管理器,但我想要更公平的。
One way to think this is that s1
and s2
send tasks to the worker which can handle only one task at the time. With merge
and flatMapConcat
I'll get a greedy task manager, but I want more fair one.
我想找到一个简单而优雅的解决方案。如果它对于任意数量的流很容易通用会很好:
I'd like to find a simple and elegant solution. Would be nice if it is easily generalisable for arbitrary amount of streams:
// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
使用RxJS或其他Rx库的解决方案也可以。
Solution using RxJS or other Rx library is fine too.
I不想要:
function roundRobinPromiseMap(streams, f) {
return Bacon.zipAsArray.apply(null, streams)
.flatMap(Bacon.fromArray)
.flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
比较示例大理石图:
s1 = a.....b..............c.......
s2 = ..1.2.3......................
mm = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
是的我会遇到缓冲问题
...但我也会直截了当地不公平一个:
function greedyPromiseMap(streams, f) {
Bacon.mergeAll(streams).flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
大理石图
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
merge = a...1...2...3...b....c...
推荐答案
这里的核心挑战是了解如何正式化公平。在这个问题中我已经提到了工人的比喻。事实证明,显而易见的公平标准是选择一个产生比其他事件少的事件的流,或采取更进一步的流:谁生成的流等待的时间更少。
The core challenge here was to understand, how to formalise fairness. In the question I already mentioned worker analogy. Turned out that the obvious fairness criteria is to pick a stream that generated less events than others, or taken even further: whom generated streams waited for less time.
之后它使用指称语义来形式化所需的输出是非常简单的:
After that it was quite trivial to formalise the desired output using denotational semantics:code is on GitHub
我没有时间开发指示性组合子,以包含来自 Bacon.js的 withStateMachine
,所以下一步是直接用 Bacon.js 在JavaScript中重新实现它。整个可运行的解决方案是。
I didn't had time to develop the denotational combinators to include withStateMachine
from Bacon.js, so the next step was to reimplement it in JavaScript with Bacon.js directly. The whole runnable solution is available as a gist.
我们的想法是建立一个状态机
The idea is to make a state machine with
- 每个流成本和队列状态为
- 流和附加反馈流作为输入
当整个系统的输出被反馈时,我们可以将下一个事件出列当前一个flatMapped流结束时。
As output of the whole system is feeded back, we can dequeue the next event when the previous flatMapped stream is ended.
为此,我不得不做一些丑陋的 rec
combinator
For that I had to make a bit ugly rec
combinator
function rec(f) {
var bus = new Bacon.Bus();
var result = f(bus);
bus.plug(result);
return result;
}
它的类型是(EventStream a - > EventStream a) - > EventStream a
- 该类型类似于其他递归组合器,例如 fix
。
It's type is (EventStream a -> EventStream a) -> EventStream a
- the type resembles other recursion combinators, e.g. fix
.
可以使用更好的系统范围行为,如 Bus 打破取消订阅传播。我们必须努力。
It can be made with better system-wide behaviour, as Bus breaks unsubscription propagation. We have to work on that.
第二个辅助函数是 stateMachine
,它接受一系列流和转他们进入单一状态机。基本上它是.withStateMachine∘insetAll∘zipWithIndex
。
The Second helper function is stateMachine
, which takes an array of streams and turns them into single state machine. Essentially it's .withStateMachine ∘ mergeAll ∘ zipWithIndex
.
function stateMachine(inputs, initState, f) {
var mapped = inputs.map(function (input, i) {
return input.map(function (x) {
return [i, x];
})
});
return Bacon.mergeAll(mapped).withStateMachine(initState, function (state, p) {
if (p.hasValue()) {
p = p.value();
return f(state, p[0], p[1]);
} else {
return [state, p];
}
});
}
使用这两个助手我们可以编写一个不那么复杂的公平调度程序:
Using this two helpers we can write a not-so-complex fair scheduler:
function fairScheduler(streams, fn) {
var streamsCount = streams.length;
return rec(function (res) {
return stateMachine(append(streams, res), initialFairState(streamsCount), function (state, i, x) {
// console.log("FAIR: " + JSON.stringify(state), i, x);
// END event
if (i == streamsCount && x.end) {
var additionalCost = new Date().getTime() - x.started;
// add cost to input stream cost center
var updatedState = _.extend({}, state, {
costs: updateArray(
state.costs,
x.idx, function (cost) { return cost + additionalCost; }),
});
if (state.queues.every(function (q) { return q.length === 0; })) {
// if queues are empty, set running: false and don't emit any events
return [_.extend({}, updatedState, { running: false }), []];
} else {
// otherwise pick a stream with
// - non-empty queue
// - minimal cost
var minQueueIdx = _.chain(state.queues)
.map(function (q, i) {
return [q, i];
})
.filter(function (p) {
return p[0].length !== 0;
})
.sortBy(function (p) {
return state.costs[p[1]];
})
.value()[0][1];
// emit an event from that stream
return [
_.extend({}, updatedState, {
queues: updateArray(state.queues, minQueueIdx, function (q) { return q.slice(1); }),
running: true,
}),
[new Bacon.Next({
value: state.queues[minQueueIdx][0],
idx: minQueueIdx,
})],
];
}
} else if (i < streamsCount) {
// event from input stream
if (state.running) {
// if worker is running, just enquee the event
return [
_.extend({}, state, {
queues: updateArray(state.queues, i, function (q) { return q .concat([x]); }),
}),
[],
];
} else {
// if worker isn't running, start it right away
return [
_.extend({}, state, {
running: true,
}),
[new Bacon.Next({ value: x, idx: i})],
]
}
} else {
return [state, []];
}
})
.flatMapConcat(function (x) {
// map passed thru events,
// and append special "end" event
return fn(x).concat(Bacon.once({
end: true,
idx: x.idx,
started: new Date().getTime(),
}));
});
})
.filter(function (x) {
// filter out END events
return !x.end;
})
.map(".value"); // and return only value field
}
要点中的其余代码相当直截了当。
Rest of the code in the gist is quite straight-forward.
这篇关于如何交错流(带背压)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!