我有一项服务,可以按固定的时间间隔更新缓存数据。每隔N秒,它将使用循环(tokio::run(future_update(http_client.clone())))触发 future ,但不会返回到解决 future 的父函数。循环块和我只有一次迭代。

当我创建一个新的 super HTTP客户端而不是传递克隆的客户端时,一切都将正常运行。它也不工作Arc<Client>

pub fn trigger_cache_reload(http_client: Arc<Client<HttpConnector, Body>>) {
    let load_interval_sec = get_load_interval_sec(conf.load_interval_seconds.clone());

    std::thread::spawn(move || loop {
        let http_client = http_client.clone();

        info!("Woke up");
        tokio::run(pipeline(http_client));
        info!(
            "Pipeline run complete. Huuhh Now I need sleep of {} secs. Sleeping",
            load_interval_sec
        );
        std::thread::sleep(std::time::Duration::from_secs(load_interval_sec));
    });
}

fn pipeline(
    client: Arc<Client<HttpConnector, Body>>,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
    let res = fetch_message_payload() //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
        .map_err(Error::from)
        .and_then(|_| {
            //let client = hyper::Client::builder().max_idle_per_host(1).build_http();
            //if i create new client here every time and use it then all working is fine.
            refresh_cache(client) //return type of this call is Box<dyn Future<Item = (), Error = Error> + Send>
                .map_err(Error::from)
                .and_then(|arg| {
                    debug!("refresh_cache completed");
                    Ok(arg)
                })
        });

    let res = res.or_else(|e| {
        error!("error {:?}", e);
        Ok(())
    });
    Box::new(res)
}

一次调用trigger_cache_reload后,我得到"woke up"日志消息。将来成功完成一段时间后,我还会收到"refresh_cache completed"日志消息。无论是否带有"sleeping",我都不会收到Arc日志消息。

如果我以后每次都创建一个新的客户端,那么我就能获得"sleeping"日志消息。

最佳答案

每次调用tokio::run时,它都会创建一个全新的事件循环和线程池( react 器+执行程序)。这确实不是您想要执行的操作。

super 客户端会将其状态绑定(bind)到先前的事件循环,并且如果在新的事件循环上进行轮询则无法取得进展,因为在run完成后,旧的事件循环将被破坏。这就是为什么一个新客户端可以工作,但是您不能重用旧客户端的原因。

这里有两种解决方案:

  • 如果您的应用程序的其余部分未使用tokio,我将仅使用同步reqwest::Client。如果您不需要大量并发,那么这里的同步解决方案会容易得多。
  • (如果您使用的是tokio),请在另一个Future中与tokio::spawn一起使用tokio_timer::Timeout来运行检查,然后在事件循环上等待指定的时间。

  • 异步/等待示例

    新的异步/等待支持使这样的代码更容易编写。

    该示例当前仅适用于带有nightly和当前tokio-0.3.0-alpha.2 master分支的hyper编译器:
    [dependencies]
    tokio = "0.3.0-alpha.2"
    tokio-timer = "0.3.0-alpha.2"
    hyper = { git = "https://github.com/hyperium/hyper.git" }
    

    use tokio::timer::Interval;
    use hyper::{Client, Uri};
    
    use std::time::Duration;
    
    #[tokio::main]
    async fn main() {
        let client = Client::new();
        let second_interval = 120;
        let mut interval = Interval::new_interval(Duration::from_secs(second_interval));
        let uri = Uri::from_static("http://httpbin.org/ip");
    
        loop {
            let res = Client.get(uri.clone()).await.unwrap();
            // Do what you need to with the response...
            interval.next().await;
        }
    }
    

    关于rust - 为什么即使在 future 结算之后,仍将带有克隆的Hyper Client与Tokio future 一起使用在循环块中?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/57589869/

    10-10 15:38