前言
本章节主要分享下,多线程并发在电商系统下的应用。主要从以下几个方面深入:线程相关的基础理论和工具、多线程程序下的性能调优和电商场景下多线程的使用。
多线程J·U·C
线程池
概念
回顾线程创建的方式
- 继承Thread
- 实现Runnable
- 使用FutureTask
线程状态
NEW:刚刚创建,没做任何操作
RUNNABLE:调用run,可以执行,但不代表一定在执行(RUNNING,READY)
WATING:使用了waite(),join()等方法
TIMED_WATING:使用了sleep(long),wait(long),join(long)等方法
BLOCKED:抢不到锁
TERMINATED:终止
线程池基本概念
根据上面的状态,普通线程执行完,就会进入TERMINA TED销毁掉,而线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动创建线程有着更多的优势:
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
- 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
- 节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
- 提供更强大的功能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor)
常用线程池类结构
说明:
- 最常用的是ThreadPoolExecutor
- 调度用的ScheduledThreadPoolExecutor
- Executors是工具类,协助创建线程池
工作机制
在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
线程池状态
RUNNING:初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。RUNNING状态下,能够接收新任务,以及对已添加的任务进行处理。
SHUTDOWN:SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
//shutdown后不接受新任务,但是task1,仍然可以执行完成 ExecutorService poolExecutor = Executors.newFixedThreadPool(5); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(1000); System.out.println("finish task 1"); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.shutdown(); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("ok");
STOP:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或SHUTDOWN ) -> STOP
//改为shutdownNow后,任务立马终止,sleep被打断,新任务无法提交,task1停止 poolExecutor.shutdownNow();
TIDYING:所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),可以通过重载terminated()函数来实现自定义行为
//自定义类,重写terminated方法 public class MyExecutorService extends ThreadPoolExecutor { public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void terminated() { super.terminated(); System.out.println("treminated"); } //调用 shutdownNow, ternimated方法被调用打印 public static void main(String[] args) throws InterruptedException { MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(5)); service.shutdownNow(); } }
TERMINA TED:线程池处在TIDYING状态时,执行完terminated()之后,就会由TIDYING ->TERMINA TED
结构说明
任务提交流程
- 添加任务,如果线程池中的线程数没有达到coreSize,会创建线程执行任务
- 当达到coreSize,把任务放workQueue中
- 当queue满了,未达maxsize创建心线程
- 线程数也达到maxsize,再添加任务会执行reject策略
- 任务执行完毕,超过keepactivetime,释放超时的非核心线程,最终恢复到coresize大小
源码剖析
execute方法
//任务提交阶段
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断当前workers中的线程数量有没有超过核心线程数
if (workerCountOf(c) < corePoolSize) {
//如果没有则创建核心线程数(参数true指的就是核心线程)
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果超过核心线程数了 先校验线程池是否正常运行后向阻塞队列workQueue末尾添加任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查线程池运行状态,若不在运行则移除该任务并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//若果没有线程在执行
else if (workerCountOf(recheck) == 0)
//则创建一个空的worker 该worker从队列中获取任务执行
addWorker(null, false);
}
//否则直接添加非核心线程执行任务 若非核心线程也添加失败 则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
线程创建:addWorker()方法
//addWorker通过cas保证了并发安全性
private boolean addWorker(Runnable firstTask, boolean core) {
//第一部分 计数判断,不符合返回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断线程数,最大29位(CAPACITY=29位二进制),所以设置线程池的线程数不是任意大的
if (wc >= CAPACITY ||
//判断工作中的核心线程是否大于设置的核心线程或者设置的最大线程数
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过cas新增 若添加失败会一直重试 若成功则跳过结束retry
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//再次判断运行状态 若运行状态改变则继续重试
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//第二部分:创建新的work放入works(一个hashSet)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//将task任务封装在新建的work中
w = new Worker(firstTask);
//获取正在执行该任务的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将work加入到workers中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//上述work添加成功了,就开始执行任务操作了
t.start();
workerStarted = true;
}
}
} finally {
//如果上述添加任务失败了,会执行移除该任务操作
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
获取任务getTask()方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//这里判断是否要做超时处理,这里决定了当前线程是否要被释放
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//检查当前worker中线程数量是否超过max 并且上次循环poll等待超时了,则将队列数量进行原子性减
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//线程可以被释放,那就是poll,释放时间就是keepAliveTime
//否则,线程不会被释放,take一直阻塞在这里,直至新任务继续工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
注意点
线程池是如何保证不被销毁的
当队列中没有任务时,核心线程会一直阻塞获取任务的方法,直至获取到任务再次执行
线程池中的线程会处于什么状态
WAITING , TIMED_WAITING ,RUNNABLE
核心线程与非核心线程有本质区别吗?
答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁
验证:看源码,每个works在runWork的时候去getTask,在getTask内部,并没有针对性的区分当前work是否是核心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take()
锁
锁的分类
1)乐观锁/悲观锁
乐观锁顾名思义,很乐观的认为每次读取数据的时候总是认为没人动过,所以不去加锁。但是在更新的时候回去对比一下原来的值,看有没有被别人更改过。适用于读多写少的场景。mysql中类比version号更新java中的atomic包属于乐观锁实现,即CAS(下节会详细介绍)
悲观锁在每次读取数据的时候都认为其他人会修改数据,所以读取数据的时候也加锁,这样别人想拿的时候就会阻塞,直到这个线程释放锁,这就影响了并发性能。适合写操作比较多的场景。mysql中类比for update。synchronized实现就是悲观锁(1.6之后优化为锁升级机制),悲观锁书写不当很容易影响性能。
2)独享锁/共享锁
很好理解,独享锁是指该锁一次只能被一个线程所持有,而共享锁是指该锁可被多个线程所持有。
案例一:ReentrantLock,独享锁
public class PrivateLock {
Lock lock = new ReentrantLock();
long start = System.currentTimeMillis();
void read() {
lock.lock();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
System.out.println("read time = "+(System.currentTimeMillis() - start));
}
public static void main(String[] args) {
final PrivateLock lock = new PrivateLock();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
lock.read();
}
}).start();
}
}
}
结果分析:每个线程结束的时间点逐个上升,锁被独享,一个用完下一个,依次获取锁
案例二:ReadWriteLock,read共享,write独享
public class SharedLock {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock lock = readWriteLock.readLock();
long start = System.currentTimeMillis();
void read() {
lock.lock();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
System.out.println("end time = "+(System.currentTimeMillis() - start));
}
public static void main(String[] args) {
final SharedLock lock = new SharedLock();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
lock.read();
}
}).start();
}
}
}
结果分析:每个线程独自跑,各在100ms左右,证明是共享的
案例三:同样是上例,换成writeLock
Lock lock = readWriteLock.writeLock();
结果分析:恢复到了1s时长,变为独享
小结:
- 读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。
- 独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
3)分段锁
从Map一家子说起....
HashMap是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
于是有了HashT able,HashT able是线程安全的。但是HashT able线程安全的策略实在不怎么高明,将get/put所有相关操作都整成了synchronized的。
那有没有办法做到线程安全,又不这么粗暴呢?基于分段锁的ConcurrentHashMap诞生...
ConcurrentHashMap使用Segment(分段锁)技术,将数据分成一段一段的存储,Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,Segment数组中每一个元素一把锁,每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。当访问其中一个段数据被某个线程加锁的时候,其他段的数据也能被其他线程访问,这就使得ConcurrentHashMap不仅保证了线程安全,而且提高了性能。
但是这也引来一个负面影响:ConcurrentHashMap 定位一个元素的过程需要进行两次Hash操作,第一次Hash 定位到Segment,第二次Hash 定位到元素所在的链表。所以Hash 的过程比普通的HashMap 要长。
备注:JDK1.8ConcurrentHashMap中抛弃了原有的Segment 分段锁,而采用了 CAS + synchronized来保证并发安全性。
4)可重入锁
可重入锁指的获取到锁后,如果同步块内需要再次获取同一把锁的时候,直接放行,而不是等待。其意义在于防止死锁。前面使用的synchronized 和ReentrantLock 都是可重入锁。
实现原理实现是通过为每个锁关联一个请求计数器和一个占有它的线程。如果同一个线程再次请求这个锁,计数器将递增,线程退出同步块,计数器值将递减。直到计数器为0锁被释放。
场景见于父类和子类的锁的重入(调super方法),以及多个加锁方法的嵌套调用。
案例一:父子可重入
public class ParentLock {
byte[] lock = new byte[0];
public void f1(){
synchronized (lock){
System.out.println("f1 from parent");
}
}
}
public class SonLock extends ParentLock {
public void f1() {
synchronized (super.lock){
super.f1();
System.out.println("f1 from son");
}
}
public static void main(String[] args) {
SonLock lock = new SonLock();
lock.f1();
}
}
案例二:内嵌方法可重入
public class NestedLock {
public synchronized void f1(){
System.out.println("f1");
}
public synchronized void f2(){
f1();
System.out.println("f2");
}
public static void main(String[] args) {
NestedLock lock = new NestedLock();
//可以正常打印 f1,f2
lock.f2();
}
}
5)公平锁/非公平锁
基本概念:
公平锁就是在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,直到按照FIFO的规则从队列中取到自己。
非公平锁与公平锁基本类似,只是在放入队列前先判断当前锁是否被线程持有。如果锁空闲,那么他可以直接抢占,而不需要判断当前队列中是否有等待线程。只有锁被占用的话,才会进入排队。在现实中想象一下游乐场旋转木马插队现象......
优缺点:
公平锁的优点是等待锁的线程不会饿死,进入队列规规矩矩的排队,迟早会轮到。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁的性能要高于公平锁,因为线程有几率不阻塞直接获得锁。ReentrantLock默认使用非公平锁就是基于性能考量。但是非公平锁的缺点是可能引发队列中的线程始终拿不到锁,一直排队被饿死。
编码方式:
很简单,ReentrantLock支持创建公平锁和非公平锁(默认),想要实现公平锁,使用new ReentrantLock(true)。
背后原理:
AQS,后面还会详细讲到。AQS中有一个state标识锁的占用情况,一个队列存储等待线程。
state=0表示锁空闲。如果是公平锁,那就看看队列有没有线程在等,有的话不参与竞争乖乖追加到尾部。如果是非公平锁,那就直接参与竞争,不管队列有没有等待者。
state>0表示有线程占着锁,这时候无论公平与非公平,都直接去排队(想抢也没有)
备注:
因为ReentrantLock是可重入锁,数量表示重入的次数。所以是>0而不是简单的0和1而synchronized只能是非公平锁
6)锁升级
java中每个对象都可作为锁,锁有四种级别,按照量级从轻到重分为:无锁、偏向锁、轻量级锁、重量级锁。
如何理解呢?A占了锁,B就要阻塞等。但是,在操作系统中,阻塞就要存储当前线程状态,唤醒就要再恢复,这个过程是要消耗时间的...
如果A使用锁的时间远远小于B被阻塞和挂起的执行时间,那么我们将B挂起阻塞就相当的不合算。
于是出现自旋:自旋指的是锁已经被其他线程占用时,当前线程不会被挂起,而是在不停的试图获取锁(可以理解为不停的循环),每循环一次表示一次自旋过程。显然这种操作会消耗CPU时间,但是相比线程下文切换时间要少的时候,自旋划算。
而偏向锁、轻量锁、重量锁就是围绕如何使得cpu的占用更划算而展开的。
举个生活的例子,假设公司只有一个会议室(共享资源)
偏向锁:
前期公司只有1个团队,那么什么时候开会都能满足,就不需要询问和查看会议室的占用情况,直接进入使用
状态。会议室门口挂了个牌子写着A使用,A默认不需要预约(ThreadID=A)
轻量级锁:
随着业务发展,扩充为2个团队,B团队肯定不会同意A无法无天,于是当AB同时需要开会时,两者竞争,谁抢
到谁算谁的。偏向锁升级为轻量级锁,但是未抢到者在门口会不停敲门询问(自旋,循环),开完没有?开完
没有?
重量级锁:
后来发现,这种不停敲门的方式很烦,A可能不理不睬,但是B要不停的闹腾。于是锁再次升级。
如果会议室被A占用,那么B团队直接闭嘴,在门口安静的等待(wait进入阻塞),直到A用完后会通知
B(notify)。
注意点:
上面几种锁都是JVM自己内部实现,我们不需要干预,但是可以配置jvm参数开启/关闭自旋锁、偏
向锁。锁可以升级,但是不能反向降级:偏向锁→轻量级锁→重量级锁
无锁争用的时候使用偏向锁,第二个线程到了升级为轻量级锁进行竞争,更多线程时,进入重量级锁阻塞
7)互斥锁/读写锁
典型的互斥锁:synchronized,ReentrantLock,读写锁:ReadWriteLock 前面都用过了互斥锁属于独享锁,读写锁里的写锁属于独享锁,而读锁属于共享锁
案例:互斥锁用不好可能会失效,看一个典型的锁不住现象!
public class ObjectLock {
public static Integer i=0;
public void inc(){
synchronized (this){
int j=i;
try {
Thread.sleep(100);
j++;
} catch (InterruptedException e) {
e.printStackTrace();
}
i=j;
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
//重点!
new ObjectLock().inc();
}
}).start();
}
Thread.sleep(3000);
//理论上10才对。可是....
System.out.println(ObjectLock.i);
}
}
结果分析:每个线程内都是new对象,所以this不是同一把锁,结果锁不住,输出1
1.this,换成static的i 变量试试?
2.换成ObjectLock.class 试试?
3.换成String.class
4.去掉synchronized块,外部方法上加static synchronized
原子操作(atomic)
概念
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作" 。类比于数据库事务,redis的multi。
CAS
Compare And Set(或Compare And Swap),翻译过来就是比较并替换,CAS操作包含三个操作数——内存位置(V)、预期原值(A)、新值(B)。从第一视角来看,理解为:我认为位置V 应该是A,如果是A,则将B 放到这个位置;否则,不要更改,只告诉我这个位置现在的值即可。
计数器问题发生归根结底是取值和运算后的赋值中间,发生了插队现象,他们不是原子的操作。前面的计数器使用加锁方式实现了正确计数,下面,基于CAS的原子类上场....
public class AtomicCounter {
private static AtomicInteger i = new AtomicInteger(0);
public int get(){
return i.get();
}
public void inc(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
i.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
final AtomicCounter counter = new AtomicCounter();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//同样可以正确输出10
System.out.println(counter.i.get());
}
}
atomic
上面展示了AtomicInteger,关于atomic包,还有很多其他类型:
基本类型
AtomicBoolean:以原子更新的方式更新boolean;
AtomicInteger:以原子更新的方式更新Integer;
AtomicLong:以原子更新的方式更新Long;
引用类型
AtomicReference : 原子更新引用类型
AtomicReferenceFieldUpdater :原子更新引用类型的字段
AtomicMarkableReference : 原子更新带有标志位的引用类型
数组
AtomicIntegerArray:原子更新整型数组里的元素。
AtomicLongArray:原子更新长整型数组里的元素。
AtomicReferenceArray:原子更新引用类型数组里的元素。
字段
AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
AtomicLongFieldUpdater:原子更新长整型字段的更新器。
AtomicStampedReference:原子更新带有版本号的引用类型。
注意
使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。
public class BadAtomic {
AtomicInteger i = new AtomicInteger(0);
static int j=0;
public void badInc(){
int k = i.incrementAndGet();
try {
Thread.currentThread().sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
j=k;
}
public static void main(String[] args) throws InterruptedException {
BadAtomic atomic = new BadAtomic();
for (int i = 0; i < 10; i++) {
new Thread(()->{
atomic.badInc();
}).start();
}
Thread.sleep(3000);
System.out.println(atomic.j);
}
}
结果分析:
每次都不一样,总之不是10
在badInc上加synchronized,问题解决
这章节目前就介绍这么多,后续将扩展更多的多线程相关的类,以及从项目中解读多线程的应用。