喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(==)
【Rust自学】20.3. 最后的项目:Web服务器的优雅停机与清理-LMLPHP

20.3.0. 回顾

在上一篇文章中我们完成了多线程Web服务器的构建,但是它的仍然有一些可以改进之处,这篇文章我们就来完善代码。

注意:本文衔接于上一篇文章 20.2. 最后的项目:多线程Web服务器。如果你想要详细了解从零开始的构建Web服务器过程,请阅读完20章的所有文章。

20.3.1. 为ThreadPool实现Drop trait

当我们想要停机(使用不太优雅的Ctrl + C方法来停止主线程)时,所有其他线程也会立即停止,即使它们正在处理请求。

管理变量清除的trait是Drop trait,我们只需要在本地写drop函数来覆盖默认实现即可,使线程能够在关闭之前完成当前正在处理的工作。我们还需要某种方式来避免线程接受新的请求并为停机做好准备

让我们来为ThreadPool类型实现Drop trait:

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

逻辑就是遍历每一个worker,然后调用workerthread字段的join方法(详见 16.1. 使用多线程同时运行代码)即可。

使用cargo check检查一下:

error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
    --> src/lib.rs:49:13
     |
49   |             worker.thread.join().unwrap();
     |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
     |             |
     |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
     |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
    --> /Users/stanyin/.rustup/toolchains/stable-aarch64-apple-darwin/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1863:17
     |
1863 |     pub fn join(self) -> Result<T> {
     |                 ^^^^

报错信息显示我们无法把workerthread字段移出来,因为我们只有每个worker的可变引用但join方法要求我们获得worker的所有权。

为了实现取得所有权的要求,我们需要修改Workerthread字段的类型,使用Option<T>包裹thread::JoinHandle<()>即可,这样我们就可以调用Option<T>take方法来获得所有权:

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

使用了thread字段的地方都得因为Option<T>而修改:

impl Worker {  
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
        let thread = thread::spawn(move || loop {  
            let job = receiver.lock().unwrap().recv().unwrap();  
            println!("Worker {} got a job; executing.", id);  
            job();  
        });  
  
        Worker {   
            id,   
            thread: Some(thread)   
        }  
    }  
}

thread字段值从thread改为了Some(thread)

impl Drop for ThreadPool {  
    fn drop(&mut self) {  
        for worker in &mut self.workers {  
            println!("Shutting down worker {}", worker.id);  
  
            if let Some(thread) = worker.thread.take() {  
                thread.join().unwrap();  
            }  
        }  
    }  
}

通过if let模式匹配来取出workerSome变体时里的值(使用take方法可以获得所有权而不是可变引用)。

20.3.2. 向线程发出信号以退出

这么修改编译能够通过了,但是还没到达效果。调用drop方法并不会真正地关停线程,因为线程还在loop循环中持续地等待任务。

如果我们用这个drop方法丢弃ThreadPool主线程就会永远阻塞,以等待第一个线程的结束(每个线程里都一直在loop寻找作业,不会跳出循环)。

我们需要ThreadPoolsender字段有两种状态——有任务的状态(附带任务的发送端)和终止的状态:

pub struct ThreadPool {  
    workers: Vec<Worker>,  
    sender: Option<mpsc::Sender<Job>>,  
}

使用Option<T>修改可以使它表示两种状态。

使用了sender字段的地方都得修改:

impl Drop for ThreadPool {  
    fn drop(&mut self) {  
        drop(self.sender.take());  
          
        for worker in &mut self.workers {  
            println!("Shutting down worker {}", worker.id);  
  
            if let Some(thread) = worker.thread.take() {  
                thread.join().unwrap();  
            }  
        }  
    }  
}

添加了drop(self.sender.take());来显式丢弃发送端,这样通道就会关闭。发生这种情况时,Worker在无限循环中执行的所有对recv的调用都将返回错误,也就停止了运行。

pub fn new(size: usize) -> ThreadPool {  
    assert!(size > 0);  
    let (sender, receiver) = mpsc::channel();  
    let mut workers = Vec::with_capacity(size);  
  
    let receiver = Arc::new(Mutex::new(receiver));  
    for id in 0..size {  
        workers.push(Worker::new(id, Arc::clone(&receiver)));  
    }  
  
    ThreadPool {  
        workers,  
        sender: Some(sender),  
    }  
}

返回值的sender字段使用Some变体包裹。

pub fn execute<F>(&self, f: F)  
    where  
        F: FnOnce() + Send + 'static,  
    {  
        let job = Box::new(f);  
  
        self.sender.as_ref().unwrap().send(job).unwrap();  
    }  
}

使用as_ref就可以避免所有权问题:send需要所有权,但是不能给(excute函数的参数是引用&self,没有所有权),所以发送引用。

这样改还不够优雅,Worker在无限循环中执行的所有对recv的调用都将返回错误,最好是不要以报错而退出,所以还要修改:

fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
    let thread = thread::spawn(move || loop {  
        let job = receiver.lock().unwrap().recv();  
          
        match job {  
            Ok(job) => {  
                println!("Worker {} got a job; executing.", id);  
                job();  
            },  
            Err(_) => break,  
        }  
    });

取消掉了job的最后一个unwrap,转而使用match分支来操作:Ok变体就执行jobErr变体就退出。

20.3.3. 试运行

为了测试修改之后的效果,我们修改main.rs只让服务器接收两个请求(通过take的限制迭代器迭代数量):

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

输出:
【Rust自学】20.3. 最后的项目:Web服务器的优雅停机与清理-LMLPHP
你可能会看到不同的信息,因为线程池中哪个线程得到工作是随机的,但是应该是大致类似的。

20.3.4. 总结

main.rs:

use std::{  
    io::{prelude::*, BufReader},  
    net::{TcpListener, TcpStream},  
    fs,  
};  
use web_server::ThreadPool;  
  
fn main() {  
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();  
    let pool = ThreadPool::new(4);  
  
    for stream in listener.incoming() {  
        let stream = stream.unwrap();  
  
        pool.execute(|| {  
            handle_connection(stream);  
        })  
    }  
}  
  
fn handle_connection(mut stream: TcpStream) {  
    let buf_reader = BufReader::new(&stream);  
    let request_line = buf_reader.lines().next().unwrap().unwrap();  
  
    let (status_line, filename) = if request_line == "GET / HTTP/1.1" {  
        ("HTTP/1.1 200 OK", "hello.html")  
    } else {  
        ("HTTP/1.1 404 NOT FOUND", "404.html")  
    };  
  
    let contents = fs::read_to_string(filename).unwrap();  
    let length = contents.len();  
  
    let response =  
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");  
  
    stream.write_all(response.as_bytes()).unwrap();  
}

lib.rs:

use std::{  
    sync::{mpsc, Arc, Mutex},  
    thread,  
};  
  
pub struct ThreadPool {  
    workers: Vec<Worker>,  
    sender: Option<mpsc::Sender<Job>>,  
}  
  
impl Drop for ThreadPool {  
    fn drop(&mut self) {  
        drop(self.sender.take());  
  
        for worker in &mut self.workers {  
            println!("Shutting down worker {}", worker.id);  
  
            if let Some(thread) = worker.thread.take() {  
                thread.join().unwrap();  
            }  
        }  
    }  
}  
  
type Job = Box<dyn FnOnce() + Send + 'static>;  
  
impl ThreadPool {  
    /// Create a new ThreadPool.  
    ///    /// The size is the number of threads in the pool.    ///    /// # Panics  
    ///    /// The `new` function will panic if the size is zero.    pub fn new(size: usize) -> ThreadPool {  
        assert!(size > 0);  
        let (sender, receiver) = mpsc::channel();  
        let mut workers = Vec::with_capacity(size);  
  
        let receiver = Arc::new(Mutex::new(receiver));  
        for id in 0..size {  
            workers.push(Worker::new(id, Arc::clone(&receiver)));  
        }  
  
        ThreadPool {  
            workers,  
            sender: Some(sender),  
        }  
    }  
  
    pub fn execute<F>(&self, f: F)  
    where  
        F: FnOnce() + Send + 'static,  
    {  
        let job = Box::new(f);  
  
        self.sender.as_ref().unwrap().send(job).unwrap();  
    }  
}  
  
struct Worker {  
    id: usize,  
    thread: Option<thread::JoinHandle<()>>,  
}  
  
impl Worker {  
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {  
        let thread = thread::spawn(move || loop {  
            let job = receiver.lock().unwrap().recv();  
  
            match job {  
                Ok(job) => {  
                    println!("Worker {} got a job; executing.", id);  
                    job();  
                },  
                Err(_) => break,  
            }  
        });  
  
        Worker {  
            id,  
            thread: Some(thread),  
        }  
    }  
}

hello.html:

<!DOCTYPE html>  
<html lang="en">  
<head>  
    <meta charset="utf-8">  
    <title>Hello!</title>  
</head>  
<body>  
<h1>Hello!</h1>  
<p>Hi from Rust</p>  
</body>  
</html>

404.html:

<!DOCTYPE html>  
<html lang="en">  
<head>  
  <meta charset="utf-8">  
  <title>Hello!</title>  
</head>  
<body>  
<h1>Oops!</h1>  
<p>Sorry, I don't know what you're asking for.</p>  
</body>  
</html>
02-07 06:44
查看更多