问题描述
我有一个很大的 Hyper HTTP 请求未来向量,我想将它们解析为一个结果向量.由于有最大打开文件的限制,我想将并发限制为N个期货.
I have a large vector of Hyper HTTP request futures and want to resolve them into a vector of results. Since there is a limit of maximum open files, I want to limit concurrency to N futures.
我已经尝试过 Stream::buffer_unordered
但它似乎是一个一个地执行期货.
I've experimented with Stream::buffer_unordered
but seems like it executed futures one by one.
推荐答案
我们使用了 像这样的代码 避免打开过多的 TCP 套接字.这些期货包含 Hyper 期货,因此看起来完全相同.
We've used code like this in a project to avoid opening too many TCP sockets. These futures have Hyper futures within, so it seems exactly the same case.
// Convert the iterator into a `Stream`. We will process
// `PARALLELISM` futures at the same time, but with no specified
// order.
let all_done =
futures::stream::iter(iterator_of_futures.map(Ok))
.buffer_unordered(PARALLELISM);
// Everything after here is just using the stream in
// some manner, not directly related
let mut successes = Vec::with_capacity(LIMIT);
let mut failures = Vec::with_capacity(LIMIT);
// Pull values off the stream, dividing them into success and
// failure buckets.
let mut all_done = all_done.into_future();
loop {
match core.run(all_done) {
Ok((None, _)) => break,
Ok((Some(v), next_all_done)) => {
successes.push(v);
all_done = next_all_done.into_future();
}
Err((v, next_all_done)) => {
failures.push(v);
all_done = next_all_done.into_future();
}
}
}
这是在一段示例代码中使用的,因此事件循环(core
)是明确驱动的.观察程序使用的文件句柄数表明它已被封顶.此外,在添加此瓶颈之前,我们很快就用完了允许的文件句柄,而后来却没有.
This is used in a piece of example code, so the event loop (core
) is explicitly driven. Watching the number of file handles used by the program showed that it was capped. Additionally, before this bottleneck was added, we quickly ran out of allowable file handles, whereas afterward we did not.
这篇关于加入限并发的期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!