我正在使用websocket
库尝试使用Futures API。我有以下代码:
use futures::future::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::{Sender, Receiver};
use tokio_core::reactor::Core;
use websocket::{ClientBuilder, OwnedMessage};
pub fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle_clone = handle.clone();
let (send, recv): (Sender<String>, Receiver<String>) = channel(100);
let f = ClientBuilder::new("wss://...")
.unwrap()
.async_connect(None, &handle_clone)
.map_err(|e| println!("error: {:?}", e))
.map(|(duplex, _)| duplex.split())
.and_then(move |(sink, stream)| {
// this task consumes the channel, writes messages to the websocket
handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| {
sink.send(OwnedMessage::Close(None))
.and_then(|_| future::ok(future::Loop::Break(())))
.map_err(|_| ())
}));
// the main tasks listens the socket
future::loop_fn(stream, |stream| {
stream
.into_future()
.and_then(|_| future::ok(future::Loop::Break(())))
.map_err(|_| ())
})
});
loop {
core.turn(None)
}
}
连接到服务器后,我想运行“监听器”和“发送器”任务,而又没有一个阻止其他任务。问题是我不能在新任务中使用
sink
,它失败并显示:error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/slack_conn.rs:29:17
|
25 | .and_then(move |(sink, stream)| {
| ---- captured outer variable
...
29 | sink.send(OwnedMessage::Close(None))
| ^^^^ cannot move out of captured outer variable in an `FnMut` closure
我可以直接使用
duplex
进行发送和接收,但这会导致更严重的错误。关于如何进行这项工作的任何想法?确实,我对任何允许我无阻塞地连接到服务器并产生两个异步任务的
futures
代码感到满意:如果我必须用其他样式编写它会很好。
最佳答案
SplitSink
实现了 Sink
,它定义了send
来取得所有权:
fn send(self, item: Self::SinkItem) -> Send<Self>
where
Self: Sized,
另一方面,
loop_fn
要求能够多次调用该闭包。这两件事从根本上是不兼容的-您如何才能多次调用需要消耗值的东西?这是一个完全未经测试的编译过的代码段-我没有流氓的WebSocket服务器。
#[macro_use]
extern crate quick_error;
extern crate futures;
extern crate tokio_core;
extern crate websocket;
use futures::{Future, Stream, Sink};
use futures::sync::mpsc::channel;
use tokio_core::reactor::Core;
use websocket::ClientBuilder;
pub fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let (send, recv) = channel(100);
let f = ClientBuilder::new("wss://...")
.unwrap()
.async_connect(None, &handle)
.from_err::<Error>()
.map(|(duplex, _)| duplex.split())
.and_then(|(sink, stream)| {
let reader = stream
.for_each(|i| {
println!("Read a {:?}", i);
Ok(())
})
.from_err();
let writer = sink
.sink_from_err()
.send_all(recv.map_err(Error::Receiver))
.map(|_| ());
reader.join(writer)
});
drop(send); // Close the sending channel manually
core.run(f).expect("Unable to run");
}
quick_error! {
#[derive(Debug)]
pub enum Error {
WebSocket(err: websocket::WebSocketError) {
from()
description("websocket error")
display("WebSocket error: {}", err)
cause(err)
}
Receiver(err: ()) {
description("receiver error")
display("Receiver error")
}
}
}
在实现过程中突出的要点是:
Future
Item
和Error
关联的类型是否为“正确”是很棘手的。我最终做了很多“类型断言”({ let x: &Future<Item = (), Error = ()> = &reader; }
)。 关于rust - 将 future 连接分为汇和流,并在两个不同的任务中使用它们,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46537490/