问题描述
我有一个基于参数生成futures::Stream
的函数.我想多次调用此函数并将流放在一起.麻烦的是,我想将流返回的值作为原始函数的参数反馈回去.
I have a function that generates a futures::Stream
based on an argument. I want to call this function multiple times and flatten the streams together. Complicating matters is the fact that I want to feed the values returned by the stream back as the argument to the original function.
具体来说,我有一个函数可以返回数字流,直到零为止:
Concretely, I have a function that returns a stream of numbers down to zero:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从5开始调用此函数.还应该为返回的每个奇数调用该函数.呼叫numbers_down_to_zero
的总次数为:
I want to call this function starting at 5. The function should also be called for every odd value that is returned. The total set of calls to numbers_down_to_zero
would be:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
产生
4
3
2
1
0
2
1
0
0
0
有什么技术可以做到这一点?
What techniques exist to allow this?
推荐答案
通过(ab)使用异步/等待, genawaiter
板条箱如今能够在稳定的Rust中模仿生成器语法.结合futures::pin_mut
在栈上固定值,这是一种无需分配且与任意流兼容的解决方案:
By (ab)using async / await, the genawaiter
crate manages to mimic generator syntax in stable Rust today. Combined with futures::pin_mut
to pin value on the stack, here is a solution both allocation-free and compatible with arbitrary streams:
//# futures = "0.3"
//# genawaiter = { version = "0.2", features = ["futures03"] }
//# tokio = { version = "0.2", features = ["full"] }
use futures::{
pin_mut,
stream::{self, Stream, StreamExt},
};
use genawaiter::{generator_mut, stack::Co};
use std::collections::VecDeque;
async fn g(n: i32, co: Co<'_, i32>) {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
}
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
#[tokio::main]
async fn main() {
generator_mut!(stream, |co| g(5, co));
stream
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
一些缺点:
-
generator_mut
宏内有一个不安全的呼叫 - 接口有点漏水.调用者可以看到一些实现细节.
- there is one unsafe call inside
generator_mut
macro - the interface is a bit leaky. The callers get to see some implementation details.
使用一个堆分配, genawaiter::rc::Gen
可以摆脱所有这些.但是同样,在表上进行分配还有其他选择.
With one heap allocation, genawaiter::rc::Gen
can get rid of all these. But again, with allocation on the table there are other options.
use futures::{
pin_mut,
stream::{Stream, StreamExt},
};
use genawaiter::rc::Gen;
use std::collections::VecDeque;
fn g(n: i32) -> impl Stream<Item = i32> {
Gen::new(|co| async move {
let mut seeds = VecDeque::from(vec![n]);
while let Some(seed) = seeds.pop_back() {
let stream = f(seed);
pin_mut!(stream);
while let Some(x) = stream.next().await {
if x % 2 != 0 {
seeds.push_front(x);
}
co.yield_(x).await;
}
}
})
}
这篇关于如何创建流,其中项基于流先前返回的项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!