本文介绍了类型不匹配可解决将消息从期货通道转发到WebSocket接收器时的错误类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图用Rust包揽期货,但是我对这段代码感到困惑,该代码应该将到达rx的消息发送到sink:

I'm trying to wrap my head around futures in Rust but I am confused by this code which is supposed to send messages arriving at rx to sink:

extern crate futures;
extern crate tokio_core;
extern crate websocket;

use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;

use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;

fn main() {
    let mut core = Core::new().unwrap();
    let (mut tx, rx) = mpsc::channel(5);
    thread::spawn(|| worker(rx));
    let mut i = 0;
    loop {
        let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
        core.run(res);
        i += 1;
        let period = time::Duration::from_millis(200);
        thread::sleep(period);
    }
}

fn worker(rx: Receiver<OwnedMessage>) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    // bind to the server
    let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
    let f = server.incoming()
            // we don't wanna save the stream if it drops
            .map_err(|InvalidConnection { error, .. }| error)
            .for_each(|(upgrade, addr)| {
                // accept the request to be a ws connection if it does
                let f = upgrade
                    .use_protocol("rust-websocket")
                    .accept()
                    .and_then(|(s, _)| {
                        let (sink, stream) = s.split();
                        rx // using stream (echoing back) works
                            .forward(sink)
                            .map_err(|error| {
                                error
                            })
                            .and_then(|(a, sink)| {
                                sink.send(OwnedMessage::Close(None))
                            })
                    });

                handle.spawn(f.map_err(move |e| println!("Err"))
                    .map(move |_| println!("Done")));
                Ok(())
            });
    core.run(f).expect("somerror");
}

如注释中所述,使用stream作为输入可以正常工作.使用rx时,编译器会抱怨有关错误类型的类型不匹配(我相信):

As noted in the comment, using stream as input works fine. When using rx, the compiler complains about a type mismatch regarding the error types (I believe):

error[E0271]: type mismatch resolving `<futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> as futures::Sink>::SinkError == ()`
  --> src/main.rs:47:26
   |
47 |                         .forward(sink)
   |                          ^^^^^^^ expected enum `websocket::WebSocketError`, found ()
   |
   = note: expected type `websocket::WebSocketError`
              found type `()`

error[E0599]: no method named `map_err` found for type `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>>` in the current scope
  --> src/main.rs:48:26
   |
48 |                         .map_err(|error| {
   |                          ^^^^^^^
   |
   = note: the method `map_err` exists but the following trait bounds were not satisfied:
           `futures::stream::Forward<futures::sync::mpsc::Receiver<websocket::OwnedMessage>, futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>> : futures::Future`

这些是我的依赖项:

[dependencies]
websocket = "0.20.0"
futures = "0.1"
tokio-core = "0.1"

我在这里想念什么?

推荐答案

error[E0271]: type mismatch resolving
  `<futures::stream::SplitSink<
        websocket::client::async::Framed<
            tokio_core::net::TcpStream,
            websocket::async::MessageCodec<websocket::OwnedMessage>>>
   as futures::Sink>::SinkError == ()`

我们在这里有两种类型:<futures::stream::SplitSink<...> as futures::Sink>::SinkError().这两种类型从何而来?同样,第一个是未解析的关联类型;第二个是未解析的关联类型.也许我们可以解决它以获得更多的见解?让我们一步一步地跟踪它.

We have two types here: <futures::stream::SplitSink<...> as futures::Sink>::SinkError and (). Where do these two types come from? Also, the first one is an unresolved associated type; perhaps we could resolve it to get some more insight? Let's trace it step by step.

首先,我们需要弄清楚为什么编译器首先尝试匹配这两种类型.如果我们查看 forward ,我们将看到约束Self::Error: From<S::SinkError>. Self是我们正在调用forward的流的类型,而S是作为参数传递给forward的接收器的类型.

First, we need to figure out why the compiler is trying to match these two types in the first place. If we look at the signature for forward, we'll see the constraint Self::Error: From<S::SinkError>. Self is the type of the stream we're calling forward on, while S is the type of the sink that's passed as an argument to forward.

我们正在rx上调用forward,其类型为futures::sync::mpsc::Receiver.在 Receiver 的文档页面上,我们可以看到以下内容:

We're calling forward on rx, whose type is futures::sync::mpsc::Receiver. On the documentation page for Receiver, we can see the following:

impl<T> Stream for Receiver<T>
  type Item = T
  type Error = ()

这向我们显示了()的来源.现在让我们来看一下sink参数.

This shows us where the () came from. Let's look at the sink argument now.

sink的类型为futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>>(我们从错误消息中知道这一点; RLS 也证实了这一点).在SplitSink文档页面上,我们有:

The type of sink is futures::stream::SplitSink<websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>> (we know this from the error message; the RLS also confirms this). On the documentation page for SplitSink, we have:

impl<S: Sink> Sink for SplitSink<S>
  type SinkItem = S::SinkItem
  type SinkError = S::SinkError

因此,SplitSinkSinkError与其内部接收器的SinkError相同.内部接收器的类型为websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>. Framed 文档是什么?说吗?

So SplitSink's SinkError is the same as its inner sink's SinkError. The inner sink's type is websocket::client::async::Framed<tokio_core::net::TcpStream, websocket::async::MessageCodec<websocket::OwnedMessage>>. What does the documentation for Framed say?

impl<T, U> Sink for Framed<T, U>
where
    T: AsyncWrite,
    U: Encoder,
    <U as Encoder>::Error: From<Error>,
  type SinkItem = <U as Encoder>::Item
  type SinkError = <U as Encoder>::Error

Framed有两个类型参数,但是我们只需要查看第二个参数(这里是websocket::async::MessageCodec<websocket::OwnedMessage>)即可确定SinkError类型.让我们看一下 MessageCodec 现在. (注意:websocket::codec::ws::MessageCodec再导出作为websocket::async::MessageCodec.)

Framed has two type parameters, but we only need to look at the second one, which is websocket::async::MessageCodec<websocket::OwnedMessage> here, to determine the SinkError type. Let's take a look at MessageCodec now. (Note: websocket::codec::ws::MessageCodec is reexported as websocket::async::MessageCodec.)

impl<M> Decoder for MessageCodec<M>
where
    M: MessageTrait,
  type Item = OwnedMessage
  type Error = WebSocketError

啊哈!接收器产生类型为 WebSocketError 的错误.

Ah ha! The sink produces errors of type WebSocketError.

现在我们已经弄清楚了类型,让我们回到为什么我们首先关心类型的原因.我们试图了解为什么在调用forward时未满足约束Self::Error: From<S::SinkError>的原因.现在我们知道编译器正在尝试解析(): From<WebSocketError>.似乎没有impl From<WebSocketError> for ().让我们验证一下:

Now that we've figured out the types, let's go back to why we cared about the types in the first place. We were trying to understand why the constraint Self::Error: From<S::SinkError> wasn't met on the call to forward. We now know that the compiler is trying to resolve (): From<WebSocketError>. It looks like there's no impl From<WebSocketError> for (). Let's verify this:

extern crate websocket;

fn main() {
    let a = websocket::result::WebSocketError::NoDataAvailable;
    let () = From::from(a);
}

实际上,这无法编译:

error[E0277]: the trait bound `(): std::convert::From<websocket::WebSocketError>` is not satisfied
 --> src/main.rs:5:14
  |
5 |     let () = From::from(a);
  |              ^^^^^^^^^^ the trait `std::convert::From<websocket::WebSocketError>` is not implemented for `()`
  |
  = note: required by `std::convert::From::from`


我们可以通过使用 sink_map_err 更改sink的错误类型.


We can work around the missing implementation by using sink_map_err to change sink's error type.

let (sink, stream) = s.split();
let sink = sink.sink_map_err(|_| ()); // <<<<<
rx
    .forward(sink)
    .and_then(|(a, sink)| {
        sink.send(OwnedMessage::Close(None))
    })

这解决了对forward的调用,但是现在此关闭的结果不与upgrade.use_protocol("rust-websocket").accept()组成,upgrade.use_protocol("rust-websocket").accept()的错误类型仍为WebSocketError.改为更改rx的错误类型更有意义.但是我们如何从不包含任何信息的()构造一个WebSocketError?

This solves the call to forward, but now the result of this closure doesn't compose with upgrade.use_protocol("rust-websocket").accept(), which still has WebSocketError as its error type. It makes more sense to change rx's error type instead. But how do we construct a WebSocketError from a (), which carries no information?

您可能想知道,为什么Receiver使用()作为其错误类型?如果我们查看源代码,我们可以看到实际上poll 从不返回错误.我认为,如果错误类型为!(从不类型)或其他无效类型,则更合适的做法是明确指出错误是不可能的.有一个期货未清要求对期货0.2进行此更改

You might be wondering, why does Receiver use () for its error type? If we look at the source code, we can see that in fact, poll never returns an error. I think it would be more appropriate if the error type was ! (the never type) or some other void type, to clearly indicate that errors are impossible; there's an issue open on futures requesting this change for futures 0.2.

由于不可能发生错误,因此我们无需构造WebSocketError;我们可以例如通过恐慌而发散.

Since errors are impossible, we don't need to construct a WebSocketError; we can just diverge instead, for example by panicking.

fn worker(rx: Receiver<OwnedMessage>) {
    let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    // bind to the server
    let server = Server::bind("127.0.0.1:9000", &handle).unwrap();
    let f = server.incoming()
            // we don't wanna save the stream if it drops
            .map_err(|InvalidConnection { error, .. }| error)
            .for_each(|(upgrade, addr)| {
                // accept the request to be a ws connection if it does
                let f = upgrade
                    .use_protocol("rust-websocket")
                    .accept()
                    .and_then(|(s, _)| {
                        let (sink, stream) = s.split();
                        rx
                            .forward(sink)
                            .and_then(|(a, sink)| {
                                sink.send(OwnedMessage::Close(None))
                            })
                    });

                handle.spawn(f.map_err(move |e| println!("Err"))
                    .map(move |_| println!("Done")));
                Ok(())
            });
    core.run(f).expect("somerror");
}

现在,仍然存在错误:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/main.rs:43:31
   |
30 |     let rx = rx.map_err(|()| panic!("Receiver should never fail!"));
   |         -- captured outer variable
...
43 |                     .and_then(|(s, _)| {
   |                               ^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

为什么闭包试图移动rx?因为forward取值self.为什么闭包是FnMut?请注意, Future::and_then 需要FnOnce(将值从捕获的变量移到FnOnce闭包中是有效的),但是 Stream::for_each 需要FnMut.这是有道理的:for_each将为每个传入连接调用一次关闭!

Why is the closure trying to move rx? Because forward takes self by value. Why is the closure an FnMut? Watch out, Future::and_then requires an FnOnce (it's valid to move a value from a captured variable into an FnOnce closure), but Stream::for_each requires an FnMut. This makes sense: for_each will invoke the closure once for each incoming connection!

您使用的频道是多制作人, 单个消费者 (因此名称为 mpsc ),但是您试图在此处拥有多个使用者(每个连接都试图从接收者那里读取数据).我将把它留给您来解决程序中的这个设计问题.请记住,可以有多个并发客户端连接!

The channels you're using are multi-producer, single-consumer (hence the name mpsc), but you're trying to have multiple consumers here (each connection is trying to read from the receiver). I'll leave it to you to fix this design issue in your program. Remember that there can be multiple concurrent client connections!

这篇关于类型不匹配可解决将消息从期货通道转发到WebSocket接收器时的错误类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-05 08:51