我正在使用websocket库尝试使用Futures API。我有以下代码:

use futures::future::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::{Sender, Receiver};
use tokio_core::reactor::Core;

use websocket::{ClientBuilder, OwnedMessage};

pub fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let handle_clone = handle.clone();

    let (send, recv): (Sender<String>, Receiver<String>) = channel(100);

    let f = ClientBuilder::new("wss://...")
        .unwrap()
        .async_connect(None, &handle_clone)
        .map_err(|e| println!("error: {:?}", e))

        .map(|(duplex, _)| duplex.split())
        .and_then(move |(sink, stream)| {

            // this task consumes the channel, writes messages to the websocket
            handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| {
                sink.send(OwnedMessage::Close(None))
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            }));

            // the main tasks listens the socket
            future::loop_fn(stream, |stream| {
                stream
                    .into_future()
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            })
        });

    loop {
        core.turn(None)
    }
}

连接到服务器后,我想运行“监听器”和“发送器”任务,而又没有一个阻止其他任务。问题是我不能在新任务中使用sink,它失败并显示:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/slack_conn.rs:29:17
   |
25 |         .and_then(move |(sink, stream)| {
   |                          ---- captured outer variable
...
29 |                 sink.send(OwnedMessage::Close(None))
   |                 ^^^^ cannot move out of captured outer variable in an `FnMut` closure

我可以直接使用duplex进行发送和接收,但这会导致更严重的错误。

关于如何进行这项工作的任何想法?确实,我对任何允许我无阻塞地连接到服务器并产生两个异步任务的futures代码感到满意:
  • 从连接中读取并采取一些措施(打印到屏幕等)的一种。
  • 从mpsc channel 读取并写入连接
  • 的一个

    如果我必须用其他样式编写它会很好。

    最佳答案

    SplitSink 实现了 Sink ,它定义了send来取得所有权:

    fn send(self, item: Self::SinkItem) -> Send<Self>
    where
        Self: Sized,
    

    另一方面, loop_fn 要求能够多次调用该闭包。这两件事从根本上是不兼容的-您如何才能多次调用需要消耗值的东西?

    这是一个完全未经测试的编译过的代码段-我没有流氓的WebSocket服务器。
    #[macro_use]
    extern crate quick_error;
    
    extern crate futures;
    extern crate tokio_core;
    extern crate websocket;
    
    use futures::{Future, Stream, Sink};
    use futures::sync::mpsc::channel;
    use tokio_core::reactor::Core;
    
    use websocket::ClientBuilder;
    
    pub fn main() {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
    
        let (send, recv) = channel(100);
    
        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream)| {
                let reader = stream
                    .for_each(|i| {
                        println!("Read a {:?}", i);
                        Ok(())
                    })
                    .from_err();
    
                let writer = sink
                   .sink_from_err()
                   .send_all(recv.map_err(Error::Receiver))
                   .map(|_| ());
    
                reader.join(writer)
            });
    
        drop(send); // Close the sending channel manually
    
        core.run(f).expect("Unable to run");
    }
    
    quick_error! {
        #[derive(Debug)]
        pub enum Error {
            WebSocket(err: websocket::WebSocketError) {
                from()
                description("websocket error")
                display("WebSocket error: {}", err)
                cause(err)
            }
            Receiver(err: ()) {
                description("receiver error")
                display("Receiver error")
            }
        }
    }
    

    在实现过程中突出的要点是:
  • 一切最终都必须变成Future
  • 可以更轻松地定义错误类型并将其转换为
  • 知道ItemError关联的类型是否为“正确”是很棘手的。我最终做了很多“类型断言”({ let x: &Future<Item = (), Error = ()> = &reader; })。
  • 关于rust - 将 future 连接分为汇和流,并在两个不同的任务中使用它们,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/46537490/

    10-11 23:12
    查看更多