创建线程池可以分为三种方式:
1. 通过ThreadPoolExecutor的构造方法,创建ThreadPoolExecutor的对象,即一个线程池对象;
此构造方法,一共7个参数,5个必须参数,2个带有默认值的参数;详细后面说;
2. 通过Executors返回的线程池对象;
这种方法创建的常用线程池为4种,还可以创建ForkJoinPool对象;
可以说是封装好的方法,通过Executors的4种常用静态方法,返回4种已经封装好的ThreadPoolExecutor线程池对象;
3. ForkJoinPool并发框架
将一个大任务拆分成多个小任务后,使用fork可以将小任务分发给其他线程同时处理,使用join可以将多个线程处理的结果进行汇总;这实际上就是分治思想。
后面再说此类;
为什么要使用线程池:
使用线程池的好处
降低资源消耗。重复利用已创建线程,降低线程创建与销毁的资源消耗。
提高响应效率。任务到达时,不需等待创建线程就能立即执行。
提高线程可管理性。
防止服务器过载。内存溢出、CPU耗尽。
进入正题:
ThreadPoolExecutor
尽量使用此类创建线程池,而非Executors创建;使用此方法,更明确线程池的运行规则,规避资源耗尽的风险
先说一下线程池的流程:
线程先进入核心池运行;
核心池满了,进队列等待;
队列满了,就创建新线程,直到最大线程数满了,之外的线程就被拒绝rejected;
在最后,会有代码演示,整个线程池的流程,很详细!
看构造器:
// 七参构造器,前五个参数必须 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, //可以不写 RejectedExecutionHandler handler) // 可以不写
ThreadPoolExecutor的四种构造器的各项参数:
corePoolSize:核心池的大小,并非线程的最大数量
maximumPoolSize > corePoolSize
在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
maximumPoolSize:线程池的最大线程数,表示线程池中最多能创建多少个线程
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止
默认:只有线程池内线程数大于corePoolSize的线程,keepAliveTime才会对其计时
当一个线程的空闲时间大于keepAliveTime,则会被终止
如果调用了allowCoreThreadTimeOut(boolean),线程池内线程数小于corePoolSize,keepAliveTime也会起作用
unit:参数keepAliveTime的时间单位(七种单位)
TimeUnit.DAYS; //天 TimeUnit.HOURS; //小时 TimeUnit.MINUTES; //分钟 TimeUnit.SECONDS; //秒 TimeUnit.MILLISECONDS; //毫秒 TimeUnit.MICROSECONDS; //微妙 TimeUnit.NANOSECONDS; //纳秒
workQueue:选择一个阻塞队列
LinkedBlockingQueue; // 常用,无界阻塞队列,不传值默认为Integer.MAX_VALUE,容易内存耗尽 SynchronousQueue; ArrayBlockingQueue; PriorityBlockingQueue // 优先队列
threadFactory:线程工厂,主要用来创建线程。如果不传此参数,默认:Executors.defaultThreadFactory()
RejectedExecutionHandler handler:表示当拒绝处理任务时的策略,有以下四种取值:
如果不传此参数,默认:ThreadPoolExecutor.AbortPolicy
// 丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.AbortPolicy // 也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardPolicy // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.DiscardOldestPolicy // 由调用线程处理该任务 ThreadPoolExecutor.CallerRunsPolicy
可以自行实现implements RejectedExecutionHandler接口,来自定义线程被线程池拒绝之后的操作:后面有演示;
ThreadPoolExecutor的重要方法:
execute(Runnable command)
通过这个方法可以向线程池提交一个任务,交由线程池去执行
此方法在执行的时候,会判断当前线程数是否大于corePoolSize
如果当前线程数大于corePoolSize,并且,当前线程池处于RUNNING状态,则将此任务加入任务缓冲队列
submit()
内部调用execute()方法
这个方法也是用来向线程池提交任务的,但是它和execute()方法不同
它能够返回任务执行的结果,利用了Future来获取任务执行结果
shutdown()
关闭线程池,此时线程池不能够接受新的任务,它会等待所有任务执行完毕
shutdownNow()
关闭线程池,线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务
演示线程池流程:
public class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString() { return this.name; } } /** * 重写RejectedExecutionHandler * 自定义线程池拒绝线程之后的行为 */ public class MyRejectedHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("Rejected:"+r.toString()); } } /** * ThreadPoolExecutor的使用实例。 * 四个必要参数, * RejectedHandler为自己定义的RejectedExecutionHandler实现类 * 线程流程: * 1.进入核心池,核心池满了,之后线程,进入队列 * 2.队列满了,继续创建线程,直到最大线程数 * 3.最大线程数已满,拒绝后续线程 */ public class ThreadPoolDemo { public static void main(String[] args) { /** * 设置线程池参数: * 核心线程:2 * 最大线程:3 * 阻塞队列大小:5 * 拒绝策略:自定义 */ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 2, 3, 2000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), new MyRejectedHandler()); // 启动一个主线程,内部启动10个子线程添加进线程池 Runnable runTask = () -> { for (int i = 0; i < 10; i++) { String name = "Task_" + i; Task task = new Task(name); try { /** * 每次添加线程到线程池,都打印线程池的内部情况 */ threadPoolExecutor.execute(task); System.out.println("PoolSize: " + threadPoolExecutor.getPoolSize() + ",Queue" + threadPoolExecutor.getQueue()); System.out.println(); } catch (Exception e) { System.out.println("Refused:" + name); } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(runTask); thread.start(); } }
运行结果:
PoolSize: 1,Queue[] PoolSize: 2,Queue[] // 到这里,核心池满了,之后线程,进入队列 PoolSize: 2,Queue[Task_2] PoolSize: 2,Queue[Task_2, Task_3] PoolSize: 2,Queue[Task_2, Task_3, Task_4] PoolSize: 2,Queue[Task_2, Task_3, Task_4, Task_5] PoolSize: 2,Queue[Task_2, Task_3, Task_4, Task_5, Task_6] // 队列满了,继续创建线程到线程池,这一个多余的线程会在等待,并倒计时keepAliveTime PoolSize: 3,Queue[Task_2, Task_3, Task_4, Task_5, Task_6] // 最大线程数已满,拒绝后续线程 Rejected:Task_8 PoolSize: 3,Queue[Task_2, Task_3, Task_4, Task_5, Task_6] Rejected:Task_9 PoolSize: 3,Queue[Task_2, Task_3, Task_4, Task_5, Task_6]
执行execute()方法和submit()方法的区别
(1)execute() 方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
(2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。