此代码 panic :
extern crate futures;
use futures::Future;
use futures::future;
use futures::sync::oneshot::{channel, Canceled};
use std::thread;
use std::time::Duration;
fn maybe_oneday() -> Box<Future<Item = i32, Error = Canceled>> {
let (s, r) = channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let _ = s.send(100);
});
return Box::new(r);
}
fn main() {
let foo = maybe_oneday();
let mut wrapper = foo.then(|x| {
match x {
Ok(v) => {
println!("GOT: {:?}", v);
future::ok::<i32, Canceled>(v)
},
Err(y) => {
println!("Err: {:?}", y);
future::err::<i32, Canceled>(y)
}
}
});
// wrapper.wait() <-- Works, but blocks
let _ = wrapper.poll(); // <-- Panics
}
和:
thread 'main' panicked at 'no Task is currently running', /checkout/src/libcore/option.rs:891:5
大概我必须使用某种执行程序来将任务解析委派给它;但是如何?
该文档引用了
my_executor
,但是there appear to be no implementations of this trait和find out more about executors链接损坏了吗?我从哪里可以得到执行人?
最佳答案
通常,tokio
和futures
被设计为异步原语,而不是通用任务系统。
也就是说,如果您有多个任务希望异步分发并“解雇”,请使用thread::spawn
。
如果要在一个线程中运行多个任务,则Future
是用于阻塞该线程直到解决一连串 future 的正确原语。
在这种情况下,我的问题并没有真正的意义,因为我认为Future
应该表示类似于C#中的Task
的东西。也就是说,动态分配给线程池以供稍后执行的任务执行,并且在解决该任务时可能发生链式 Action ;这些任务又可能在不同的线程中执行。
这不是futures
和tokio
支持的模型。
但是,我在这里补充一下,只是为了激怒反对者,这是我提出的实际问题的答案:
答案是tokio实现了许多基本的Executor
,其中包括一个用于任意任务的代码。
请参阅:https://docs.rs/tokio/0.1.1/tokio/executor/current_thread/struct.TaskExecutor.html
具体来说:
https://docs.rs/tokio/0.1.1/tokio/executor/current_thread/index.html
您可以像这样使用它们:
extern crate futures;
extern crate tokio;
use futures::Future;
use futures::future;
use futures::future::Executor;
use tokio::executor::current_thread;
use futures::sync::oneshot::{channel, Canceled};
use tokio::executor::current_thread::task_executor;
use std::thread;
use std::time::Duration;
use std::sync::mpsc::Sender;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
struct RemoteReactor {
channel: Sender<Box<Future<Item=(), Error=()> + Send + 'static>>
}
impl RemoteReactor {
fn new() -> RemoteReactor {
let (send, recv) = mpsc::channel::<Box<Future<Item=(), Error=()> + Send + 'static>>();
let threadsafe_recv = Arc::new(Mutex::new(recv));
thread::spawn(move || {
let reader = threadsafe_recv.lock().unwrap();
current_thread::run(|_| {
loop {
let future = reader.recv().unwrap();
println!("Got a future!");
task_executor().execute(future).unwrap();
break;
}
});
});
return RemoteReactor {
channel: send
};
}
fn execute(&self, future: Box<Future<Item=(), Error=()> + Send + 'static>) {
self.channel.send(future).unwrap();
}
}
fn maybe_oneday() -> Box<Future<Item=i32, Error=Canceled> + Send + 'static> {
let (s, r) = channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let _ = s.send(100);
});
return Box::new(r);
}
fn main() {
let foo = maybe_oneday();
let wrapper = Box::new(foo.then(|x| {
match x {
Ok(v) => {
println!("GOT: {:?}", v);
future::ok::<(), ()>(())
}
Err(y) => {
println!("Err: {:?}", y);
future::err::<(), ()>(())
}
}
}));
let reactor = RemoteReactor::new();
reactor.execute(wrapper);
println!("Waiting for future to resolve");
thread::sleep(Duration::from_millis(200));
println!("All futures are probably resolved now");
}
注意由于我不了解的原因,这段代码未在play.rust-lang.org(
error[E0463]: can't find crate for tokio
)上运行,但确实运行了rust 1.24:rustc 1.24.0 (4d90ac38c 2018-02-12)
$ cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.1 secs
Running `target\debug\hello_future.exe`
Waiting for future to resolve
Got a future!
GOT: 100
All futures are probably resolved now
关于rust - 您如何使用执行器解决 rust 的 future ?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/48940634/