1. 进程与线程
通常,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个Word就启动了一个Word进程。大多时候一个进程需要同时干很多件事情,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。即一个程序至少有一个进程,一个进程至少有一个线程。
我们大学学操作系统的时候,都知道进程是资源分配的基本单位,线程是执行和调度的基本单位,线程本身不拥有资源,资源来自于它的进程。也就是说,进程在执行过程中有自己独立的内存空间,与其他进程相互隔离,因此进程间的通信需要另辟蹊径,比如常见的有管道通信、消息队列、信号量以及套接字等方法,同时,一个进程中有三大块——进程控制块(PCB)、数据段、代码段,这会导致进程间的会产生很大的开销。而线程与线程之间因为共享进程申请的内存区域,它们之间可以相互通信,因为线程的粒度小,使得线程的切换速度比进程快很多,可以极大地提高程序的运行效率,如下,是线程和进程的关系(图片摘自知乎)。
线程分为两种:用户线程和守护线程。守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。守护线程是用来服务用户线程的,一旦用户线程全部运行结束,程序会终止,守护线程也会随之退出。
在进入多线程之前,可以先看看线程的几种状态:
- 1. 新建(NEW):新创建了一个线程对象。
- 2. 就绪(RUNNABLE或READY) :线程正在参与竞争CPU的使用权。
- 3. 运行(RUNNING):线程取到了CPU的使用权,正在执行。
- 4. 阻塞(BLOCKED):阻塞状态是线程因为某种原因放弃CPU使用权,暂时停止运行。直到满足条件(比如超时等待、唤醒)时,该线程重新回到就绪态,参与竞争CPU使用权。
- 5. 等待(WAITING):线程无限等待某个对象的锁,或等待另一个线程结束的状态。
- 6. 计时等待(TIME_WAITING):线程在某一段时间内等待某个对象的“锁”,或者主动休眠,亦或者等待一个线程结束,除非被中断,时间一到,马上回到就绪状态,被中断的方法则抛出异常。
- 7. 终止(Terminated):即线程终止(线程的的代码被执行完毕)和执行过程出现异常或者被外界强制中断。
状态的转换的具体转换如下图所示:
2. Thread类和Runnable接口
通过继承Thread类,实run方法即可实现一个线程类,常用的API如下:
由于java中的类是单继承的,而接口可以多继承。一个类实现多个接口的情况,因为接口只有抽象方法,具体方法只能由实现接口的类实现,在调用的时候始终只会调用实现类的方法(不存在歧义),因此在开发中通常使用Runnable。
public class Thread1 implements Runnable { @Override public void run() { System.out.println("iii"); } public static void main(String[] args) { Thread1 rt = new Thread1(); Thread t = new Thread(rt); t.start(); } }
3. 线程池
当线程的在某一时刻大量的创建与销毁会消耗很多资源,我们可以提前创建好一些线程,将他们集中管理起来,形成一个线程池,需要使用的时候直接拿过来用,使用完后,放回线程池。
Executor框架
在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作,由Executors类的五个静态工厂方法创建,其常用方法如下。
3.1 线程池的创建
newFixedThreadPool:创建固定大小的线程池。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public class Executors { /* 函数功能:创建一个固定长度的的线程池,用于保存任务的阻塞队列为无限制长度的LinkedBlockingQueue。 线程池中的线程将会一直存在除非线程池shutdown,即线程池中的线程没有受到存活时间的限制。 */ public static ExecutorService newFixedThreadPool(int nThreads) { /* 参数一 核心线程数大小(最小线程数),当线程数 < 参数一 ,会创建线程执行 runnable * 参数二 最大线程数, 当线程数 >= 参数二,会把runnable放入workQueue(参数5)中 * 参数三 保持存活时间,空闲线程能保持的最大时间。 * 参数四 时间单位 * 参数五 保存任务的阻塞队列 *public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueu */ return new ThreadPoolExecutor(nThreads1, nThreads2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //... } ExecutorService es = Executors.newFixedThreadPool(20); //如果线程池中线程数过大或过小,都会影响性能
newCachedThreadPool:创建一个可缓存空闲线程60秒的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。
public class Executors { public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //... } ExecutorService es = Executors.newCachedThreadPool(); //缺点是在访问量突然很大的时候,会创建大量线程
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
ExecutorService es = Executors.newSingleThreadExecutor(); //等同于 ExecutorService es = Executors.newFixedThreadPool(1);
newScheduledThreadPool:创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000/*第一个周期开始的时间*/, 2000/*每个周期间隔的时间*/, TimeUnit.MILLISECONDS); }
newSingleThreadScheduledExecutor:创建一个单线程的线程池。此线程池支持定时以及周期性执行任务的需求。
3.2 线程池中线程的使用
通过Executors类去获得的线程池都实现了ExecutorService这个接口。可以调用execute()或者submit()方法把相应的任务提交到线程池中去。
1. execute(Runnable): 这个方法接收一个Runnable实例,并且异步的执行。
public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); // Future future = es.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run the thread."); } }); System.out.println("over"); } /* output: * over * run the thread. */
2. submit(Runnable): submit(Runnable)
和execute(Runnable)
区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future future = es.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run the thread."); } }); future.get(); //future.get()方法会产生阻塞,直到上面的线程完成,即等待一秒钟 System.out.println("over"); } /* output: * run the thread. * over */
3. submit(Callable): submit(Callable)
和submit(Runnable)
类似,也会返回一个Future对象,但是参数Callable类中的call方法可以返回一个值,而Runable不行。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future future = es.submit(new Callable() { @Override public Object call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "run the thread."; } }); String rs = (String)future.get(); //future.get()方法会产生阻塞 System.out.println("over and " + rs); } /* output: * over and run the thread. */
4. invokeAny(Collection<? extends Callable<T>> tasks>): 方法输入接受一个Callable集合类型的参数,启动多个线程相互独立的去执行对应线程的任务,一旦有一个线程执行完毕,则返回,同时其他线程终止。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(3); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return " first task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000); return " second task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return " third task"; } }); String rs = es.invokeAny(callables); System.out.println(rs); } /* output * second task */
5. invokeAll(Collection<? extends Callable<T>> tasks>): 该方法则会并行的执行Callable集合类型的所有方法。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(3); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return " first task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000); return " second task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return " third task"; } }); List<Future<String>> list= es.invokeAll(callables); for (Future<String> future:list) { String s = future.get(); System.out.println(s); } }
6. shotdown(): 调用该方法后,关闭线程池,已提交的方法会继续执行,执行结束后,线程池全部关闭,该方法是一个异步方法,一旦调用,立即返回。
7.shotdownNow(): 调用该方法后,关闭线程池,已提交的方法也会被取消,线程池立即全部关闭,该方法是一个异步方法,一旦调用,立即返回。
8. awaitTermination(timeout,unit): 调用该方法阻塞当前线程,使得线程池中的线程执行完毕,最长等待时间为timeout,此方法需要在调用shotdown/shotdownNow后才有效。
public class ThreadSafe implements Runnable { private static int count = 0; @Override public void run() { for (int i = 0; i < 10; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允许添加线程,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } } /* output * 200 */
参考文献
庞永华. Java多线程与Socket:实战微服务框架[M].电子工业出版社.2019.3