一、基本概念与关系
程序
程序是含有指令和数据的文件,静态地存储在磁盘等存储设备上。它是软件的实体,但未被激活。
进程
进程是程序的一次执行过程,是系统运行程序的基本单位。当程序被操作系统加载并执行时,就成为一个进程,具有动态性。进程拥有独立的内存空间和系统资源(如CPU时间、内存、I/O设备等),是一个正在执行中的程序实例。进程之间相互独立,不共享内存空间。
线程
线程是进程内部的一个执行单元,是进程中实际运作的小单位,也是CPU调度的基本单位。相比进程,线程更轻量,同类线程共享同一块内存空间和系统资源。线程之间可以共享数据,但也可能相互影响,特别是在同一个进程中。线程的引入提高了程序的并发执行能力。
三者之间的关系
- 程序到进程:程序通过加载执行转变为进程,实现了从静态代码到动态执行实体的转变。
- 进程到线程:一个进程可以包含多个线程,这些线程共享进程的资源,在同一进程环境下并发执行不同的任务,提高了效率和响应速度。
- 线程相比进程,降低了资源开销,提高了通信效率,但同时也可能因为资源共享引发同步问题。
二、线程的基本状态
Java 线程在其生命周期中会经历以下6种基本状态之一,并且随着代码执行和系统调度在这些状态之间转换:
- 新建(New):线程刚被创建,尚未启动。
- 可运行(Runnable):线程可以被调度执行,包括已经获得CPU时间片正在运行的线程和等待CPU分配时间片的线程。
- 阻塞(Blocked):线程等待某个监视器锁(例如进入synchronized代码块时),或者等待I/O操作完成等。
- 等待(Waiting):线程等待其他线程执行特定操作,如调用
Object.wait()
方法,不可被中断直到等待条件满足。 - 超时等待(Timed Waiting):与等待状态相似,但线程在指定时间后会自动醒来,如调用
Thread.sleep(long millis)
方法。 - 终止(Terminated):线程已结束执行,无论是正常结束还是异常结束。
这些状态描述了线程从创建到执行完毕的完整生命周期,理解这些状态对于调试多线程程序和避免死锁等问题至关重要。
Java 线程状态如下图所示:
三、 线程池的原理与创建原因及方式
原理
线程池是一种基于池化思想管理线程的机制,其核心在于重用线程资源,减少创建和销毁线程的开销。线程池的工作流程主要包括以下几个步骤:
- 创建线程池:初始化时,预先创建一定数量的线程并将其置于待命状态,等待任务分配。
- 任务提交:当有新任务到达时,将其加入到任务队列中。
- 任务调度:线程池中的空闲线程会从任务队列中取出任务并执行。若所有线程均在忙状态,且任务队列已满,则根据策略决定是否创建新线程或拒绝任务。
- 任务执行完毕:线程不会立即销毁,而是返回线程池等待下一个任务。
- 线程管理:根据系统负载动态调整线程数量,避免过多线程导致资源耗尽或过少线程影响任务处理速度。
优点
- 资源重用:通过重复利用已创建的线程,减少了线程创建和销毁的开销。
- 提高响应速度:任务到达时无需等待新线程创建即可立即执行。
- 管理灵活性:可根据系统状况动态调整线程数量,有效控制资源使用,防止资源耗尽。
- 简化编程模型:提供统一的接口给开发者提交任务,隐藏了线程管理的复杂细节。
- 提高系统稳定性:通过控制最大线程数,防止了过多线程导致的资源竞争和调度开销,增强了系统的稳定性和可预测性。
创建线程池的方式
-
手动创建:基础方式,使用语言提供的线程创建API(如Java中的
Thread
类)配合同步机制(如队列、锁等)自行实现线程池逻辑。 -
使用标准库:
- Java: 通过
Executors
类提供的工厂方法创建不同类型线程池,如newFixedThreadPool
创建固定大小线程池,newCachedThreadPool
创建可缓存线程池等。 - C++/POSIX: 利用
std::thread
结合std::queue
等容器自建线程池,或使用第三方库如boost::thread_pool
。 - 其他语言:如Python的
concurrent.futures.ThreadPoolExecutor
,Node.js的worker_threads
模块等,都提供了线程池或类似机制的封装。
- Java: 通过
-
第三方库:使用成熟的第三方线程池库,如Java的
Apache Commons ThreadPoolExecutor
,C#的Microsoft TPL(Task Parallel Library)
等,这些库通常提供了更高级的特性,如线程池监控、任务调度策略等。
选择合适的创建方式需根据项目需求、性能要求以及目标平台的支持程度综合考虑。
四、线程池的创建与参数解析
在Java中,通过ThreadPoolExecutor
类来创建自定义线程池,其构造函数提供了高度灵活的配置选项,以便根据具体需求调整线程池的行为。下面是构造函数及其参数的详细介绍:
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 存活时间的单位
BlockingQueue<Runnable> workQueue, // 任务队列
RejectedExecutionHandler handler // 拒绝策略
)
参数解释
-
corePoolSize:核心线程数
线程池中常驻的核心线程数量。即使线程空闲,这部分线程也会保留在线程池中,不会被回收。只有在工作队列满并且当前线程数小于最大线程数时才会创建新的线程。 -
maximumPoolSize:最大线程数
线程池能容纳的最大线程数量。当活动线程达到这个数值后,新来的任务如果不能放入任务队列将被拒绝处理。 -
keepAliveTime:空闲线程存活时间
当线程池中的线程数量超过核心线程数时,多余的空闲线程在等待新任务最长时间达到keepAliveTime后,如果还没有新任务到来,那么这些线程将被终止以节省资源。 -
unit:存活时间的单位
keepAliveTime参数的时间单位,如TimeUnit.SECONDS
秒、TimeUnit.MILLISECONDS
毫秒等。 -
workQueue:任务队列
用于保存等待执行的任务的阻塞队列。不同的队列实现会影响线程池的行为,常见的有ArrayBlockingQueue
(固定大小)、LinkedBlockingQueue
(无界或有限界限)、SynchronousQueue
(直接传递,没有容量)等。 -
handler:拒绝策略
当线程池和任务队列都达到饱和状态时(即无法再接受新任务),如何处理新提交的任务。JDK内置了几种拒绝策略:AbortPolicy
:默认策略,丢弃任务并抛出RejectedExecutionException
异常。CallerRunsPolicy
:调用者运行策略,直接在调用者线程中执行被拒绝的任务。DiscardPolicy
:静默丢弃任务,不抛出任何异常。DiscardOldestPolicy
:丢弃队列中最旧的任务,并尝试重新提交当前任务。
通过调整这些参数,可以创建符合特定应用场景需求的线程池,以达到资源最优利用和任务高效执行的目的。
五、 线程池原理
1. 固定大小线程池(FixedThreadPool)
特点:
参数分析:
-
corePoolSize:等于最大线程数,一旦创建就不会改变,保证线程数量恒定。
-
workQueue:默认为LinkedBlockingQueue,队列容量无上限。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task ID : " + taskId + ", Thread Name : " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
}
2. 可缓存线程池(CachedThreadPool)
特点:
参数分析:
- corePoolSize=0,初始时无核心线程。
- maximumPoolSize=Integer.MAX_VALUE,理论上允许无限增长。
- keepAliveTime=1分钟,空闲线程等待新任务的最长时间。
- workQueue=SynchronousQueue,直接传递,不保留任务。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task ID : " + taskId + ", Thread Name : " + Thread.currentThread().getName());
});
}
executor.shutdown();
}
}
3. 定时任务线程池(ScheduledThreadPool)
特点:
参数分析:
- corePoolSize:线程池的基本大小,即使没有任务执行,线程也会保持存活。
- 支持schedule系列方法设定任务的执行时间或周期。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 3; i++) {
final int taskId = i;
executor.scheduleAtFixedRate(() -> {
System.out.println("Task ID : " + taskId + ", Thread Name : " + Thread.currentThread().getName());
}, 0, 1, TimeUnit.SECONDS);
}
}
}
六、常用的线程池
提交一个任务到线程池中,线程池的处理流程如下:
- 1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创
建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。 - 2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如
果工作队列满了,则进入下个流程。 - 3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
1、ThreadPoolExecutor的execute()方法
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 如果线程数大于等于核心线程数或者线程创建失败,将任务加入队列
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 线程池处于运行状态并且加入队列成功
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0) {
ensureQueuedTaskHandled(command);
}
}
// 线程池不处于运行状态或者加入队列失败,则创建线程(创建的是非核心线程)
else if (!addIfUnderMaximumPoolSize(command)) {
// 创建线程失败,则采取阻塞处理的方式
reject(command); // is shutdown or saturated
}
}
}
2、创建线程的方法:addIfUnderCorePoolSize(command)
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING) {
t = addThread(firstTask);
}
} finally {
mainLock.unlock();
}
if (t == null) {
return false;
}
t.start();
return true;
}
我们重点来看第7行:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize) {
largestPoolSize = nt;
}
}
return t;
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
3、创建一个线程
public class ThreadPoolTest implements Runnable {
@Override
public void run() {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4、线程池循环运行11个线程:
public static void main(String[] args) {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(5);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
60, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
queue // 任务队列
);
for (int i = 0; i < 11; i++) {
threadPool.execute(
new Thread(new ThreadPoolTest(), "Thread" + i)
);
System.out.println("活跃的线程数: " + threadPool.getPoolSize());
if (queue.size() > 0) {
System.out.println("----------------队列阻塞的线程数" + queue.size());
}
}
threadPool.shutdown();
}
执行结果:
活跃的线程数: 1
活跃的线程数: 2
活跃的线程数: 3
活跃的线程数: 4
活跃的线程数: 5
活跃的线程数: 5
----------------队列阻塞的线程数1
活跃的线程数: 5
----------------队列阻塞的线程数2
活跃的线程数: 5
----------------队列阻塞的线程数3
活跃的线程数: 5
----------------队列阻塞的线程数4
活跃的线程数: 5
----------------队列阻塞的线程数5
活跃的线程数: 6
----------------队列阻塞的线程数5
活跃的线程数: 7
----------------队列阻塞的线程数5
活跃的线程数: 8
----------------队列阻塞的线程数5
活跃的线程数: 9
----------------队列阻塞的线程数5
活跃的线程数: 10
----------------队列阻塞的线程数5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task
Thread[Thread15,5,main] rejected from
java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 10, active
threads = 10, queued tasks = 5, completed tasks = 0]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPool
Executor.java:2047)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at test.ThreadTest.main(ThreadTest.java:17)
分析结果
观察与分析
-
线程池配置概览:创建的线程池具体配置为:核心线程数量为5个;最大线程总数为10个;关联的工作队列容量为5个任务。
-
工作队列监控:通过queue.size()方法实时监测工作队列中的任务数量,帮助理解线程池的工作状态。
-
运行机制解析:
- 初始阶段,随着任务的提交,线程池会创建核心线程直至达到5个。
- 当核心线程满载后,新任务被加入到工作队列中,直至队列也达到其容量上限5个。
- 队列饱和后,线程池会继续创建非核心线程,直至线程总数达到最大限制10个。
- 若线程数和队列均达到上限,此时线程池进入饱和状态,需依据饱和策略处理后续提交的任务。
饱和策略(RejectedExecutionHandler)调整
当线程池和队列均达到饱和,即无法接纳新任务时,JDK提供了四种预设的饱和策略处理新提交的任务:
- AbortPolicy(默认):直接抛出RejectedExecutionException异常。
- CallerRunsPolicy:将任务回退给调用线程直接执行。
- DiscardOldestPolicy:丢弃队列中最旧的任务,尝试重新提交当前任务。
- DiscardPolicy:直接丢弃新提交的任务,不抛出异常。
修改示例:现在,我们将上述示例中的饱和策略改为DiscardPolicy,即丢弃新提交的任务而不抛出异常。
public static void main(String[] args) {
// ... 线程池初始化代码保持不变 ...
// 设置饱和策略为DiscardPolicy
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 任务提交逻辑保持不变 ...
threadPool.shutdown();
}
基于之前的讨论,让我们整合并扩展这部分内容,包括修改线程池的饱和策略为DiscardPolicy
并进行相应排版:
观察与分析
-
线程池配置概览:创建的线程池具体配置为:核心线程数量为5个;最大线程总数为10个;关联的工作队列容量为5个任务。
-
工作队列监控:通过
queue.size()
方法实时监测工作队列中的任务数量,帮助理解线程池的工作状态。 -
运行机制解析:
- 初始阶段,随着任务的提交,线程池会创建核心线程直至达到5个。
- 当核心线程满载后,新任务被加入到工作队列中,直至队列也达到其容量上限5个。
- 队列饱和后,线程池会继续创建非核心线程,直至线程总数达到最大限制10个。
- 若线程数和队列均达到上限,此时线程池进入饱和状态,需依据饱和策略处理后续提交的任务。
饱和策略(RejectedExecutionHandler
)调整
当线程池和队列均达到饱和,即无法接纳新任务时,JDK提供了四种预设的饱和策略处理新提交的任务:
- AbortPolicy(默认):直接抛出
RejectedExecutionException
异常。 - CallerRunsPolicy:将任务回退给调用线程直接执行。
- DiscardOldestPolicy:丢弃队列中最旧的任务,尝试重新提交当前任务。
- DiscardPolicy:直接丢弃新提交的任务,不抛出异常。
修改示例:现在,我们将上述示例中的饱和策略改为DiscardPolicy
,即丢弃新提交的任务而不抛出异常。
public static void main(String[] args) {
// ... 线程池初始化代码保持不变 ...
// 设置饱和策略为DiscardPolicy
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 任务提交逻辑保持不变 ...
threadPool.shutdown();
}
通过这样的调整,当线程池和队列达到饱和状态后,任何新提交的任务将被默默地丢弃,这种方式适用于那些可以安全丢弃的任务场景,避免了因任务拒绝而引发的异常中断,提高了程序的健壮性。