问题描述
我想使用async
函数逐步解析入站流,但是actix-web需要impl Future<Item = HttpResponse, Error = Error>
作为返回值.
I want to use the async
function to parse the inbound stream progressively, but actix-web requires impl Future<Item = HttpResponse, Error = Error>
as the return value.
如何将async
函数返回的未来转换为actix-web所需的内容?
How can I convert the future returned by async
function to what actix-web requires?
我每晚使用Rust 1.39和actix-web 1.0.7.
I'm using Rust 1.39 nightly and actix-web 1.0.7.
http_srv.rs :
use futures::compat::Stream01CompatExt;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::TryStreamExt;
use futures01::future::Future;
use futures01::stream::Stream;
use futures01::sync::mpsc; // for `try_next`
use actix_web::*;
use bytes::Bytes;
use futures_timer::Delay;
use std::time::Duration;
fn inbound(
req: HttpRequest,
stream: web::Payload,
) -> impl Future<Item = HttpResponse, Error = Error> {
let fut = async_inbound(&req, &stream);
fut.unit_error().boxed_local().compat() // <--- compliation error here.
}
async fn async_inbound(req: &HttpRequest, stream: &web::Payload) -> HttpResponse {
let mut compat_stream = stream.compat();
loop {
let result = compat_stream.try_next().await;
if let Err(e) = result {
warn!("Failed to read stream from {} : {}", req.path(), e);
break;
}
if let Ok(option) = result {
match option {
None => {
info!("Request ends");
break;
}
Some(data) => {
println!("{:?}", data);
}
}
}
}
HttpResponse::Ok().content_type("text/html").body("RESP")
}
pub fn start(port: u16) {
info!("Starting HTTP server listening at port {} ...", port);
let _ = HttpServer::new(|| {
App::new()
.wrap(middleware::DefaultHeaders::new().header(http::header::CACHE_CONTROL, "no-cache"))
.wrap(middleware::Logger::default())
.service(web::resource("/").route(web::put().to_async(inbound)))
})
.bind(format!("0.0.0.0:{}", port))
.expect(&format!("Unable to bind on port {}", port))
.run()
.expect("Failed to start HTTP server");
}
Cargo.toml :
dependencies]
log = "0.4.8"
env_logger = "0.6.2"
chrono = "0.4.8"
actix = "0.8.3"
bytes = "0.4.12"
actix-utils = "0.4.5"
futures-timer = "0.3"
futures01 = { package = "futures", version = "0.1", optional = false }
[dependencies.actix-web]
version = "1.0.7"
features = ["ssl"]
# https://rust-lang-nursery.github.io/futures-rs/blog/2019/04/18/compatibility-layer.html
# Rust’s futures ecosystem is currently split in two:
# On the one hand we have the vibrant ecosystem built around [email protected] with its many libraries working on stable Rust
# and on the other hand there’s std::future ecosystem with support for the ergonomic and powerful async/await language feature.
# To bridge the gap between these two worlds we have introduced a compatibility layer as part of the [email protected] extension to std::future.
[dependencies.futures-preview]
version = "0.3.0-alpha.18"
default-features = false
features = ["compat", "async-await", "nightly"]
编译错误:
error[E0271]: type mismatch resolving `<std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<actix_http::response::Response, ()>>>> as core::future::future::Future>::Output == std::result::Result<_, actix_http::error::Error>`
--> src/http_server.rs:39:55
|
39 | fn inbound(req: HttpRequest, stream: web::Payload) -> impl Future<Item=HttpResponse, Error=Error> {
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected (), found struct `actix_http::error::Error`
|
= note: expected type `std::result::Result<actix_http::response::Response, ()>`
found type `std::result::Result<_, actix_http::error::Error>`
= note: required because of the requirements on the impl of `futures_core::future::TryFuture` for `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<actix_http::response::Response, ()>>>>`
= note: the return type of a function must have a statically known size
推荐答案
std::future
-> [email protected]
转换步骤:
- 未来应该是
TryFuture
(Output = Result<T, E>
) - 未来需要为
Unpin
(可以使用boxed
组合器) - 最后,您可以调用
compat
组合器
- The future needs to be
TryFuture
(Output = Result<T, E>
) - The future needs to be
Unpin
(you can useboxed
combinator) - Finally, you can call the
compat
combinator
您的inbound
函数:
fn inbound(
req: HttpRequest,
stream: web::Payload,
) -> impl Future<Item = HttpResponse, Error = Error> {
let fut = async_inbound(&req, &stream);
fut.unit_error().boxed_local().compat()
}
inbound
函数签名可以,但不能进行转换.
The inbound
function signature is fine, but the conversion isn't.
async_inbound
功能不是TryFuture
(由于-> HttpResponse
).您正在尝试使用unit_error
组合器对其进行转换,但结果为Result<HttpResponse, ()>
,并且您需要Result<HttpResponse, Error>
.修复了inbound
功能:
The async_inbound
function isn't TryFuture
(because of -> HttpResponse
). You're trying to convert it with the unit_error
combinator, but the result is Result<HttpResponse, ()>
and you want Result<HttpResponse, Error>
. Fixed inbound
function:
fn inbound(
req: HttpRequest,
stream: web::Payload,
) -> impl Future<Item = HttpResponse, Error = Error> {
let fut = async_inbound(req, stream);
fut.boxed_local().compat()
}
您的async_inbound
功能:
async fn async_inbound(req: &HttpRequest, stream: &web::Payload) -> HttpResponse {
// ...
}
这里的第一个问题是将-> HttpResponse
替换为-> Result<HttpResponse>
.另一个问题是您要通过引用传递reg
和stream
.移动它们,因为无需参考,您将需要'static
.修复了async_inbound
功能:
The first issue here is to replace -> HttpResponse
with -> Result<HttpResponse>
. Another problem is that you're passing reg
and stream
by reference. Move them as there's no need to take a reference and you'll need 'static
. Fixed async_inbound
function:
async fn async_inbound(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse> {
let mut compat_stream = stream.compat();
while let Some(data) = compat_stream.try_next().await? {
println!("{:?}", data);
}
Ok(HttpResponse::Ok().content_type("text/html").body("RESP"))
}
这篇关于如何将异步/标准库Future转换为Futures 0.1?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!