synchronized的意义
解决了Java共享内存模型带来的线程安全问题:
如:两个线程对初始值为 0 的静态变量一个做自增,一个做自减,各做 5000 次,结果是 0 吗?(针对这个问题进行分析)
代码展示
public class SyncDemo { private static volatile int counter = 0; //临界资源 public static void increment() { counter++; } //临界区 public static void decrement() { counter--; } //临界区 public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0; i < 50000; i++) { increment(); } }, "t1"); Thread t2 = new Thread(() -> { for (int i = 0; i < 50000; i++) { decrement(); } }, "t2"); t1.start(); t2.start(); t1.join(); t2.join(); //思考: counter=? log.info("counter={}", counter); } }
问题分析
结论:以上的结果可能是正数、负数、零。为什么呢?因为 Java 中对静态变量的自增,自减并不是原子操作。
探究原因
我们可以查看 i++和 i--(i 为静态变量)的 JVM 字节码指令 ( 可以在idea中安装一个jclasslib插件)
i++的JVM 字节码指令
getstatic i // 获取静态变量i的值 iconst_1 // 将int常量1压入操作数栈 iadd // 自增 putstatic i // 将修改后的值存入静态变量i
i--的JVM 字节码指令
getstatic i // 获取静态变量i的值 iconst_1 // 将int常量1压入操作数栈 isub // 自减 putstatic i // 将修改后的值存入静态变量i。
问题深入:
要知道,我们所写的程序由代码组成,而代码会被编译为指令,每一段可能会有多个指令组成
我们写了一个 Java 程序,包含一系列的语句,我们会默认期望这些语句的实际运行顺序和写的代码顺序一致。
但实际上,编译器、JVM 或者 CPU 都有可能出于优化等目的,对于实际指令执行的顺序进行调整,这就是指令重排序。(重排序的好处:提高处理速度)
注意点:不过重排序并不意味着可以任意排序,它需要需要保证重排序后,不改变单线程内的语义,否则如果能任意排序的话,程序早就逻辑混乱了
故,如果是单线程以上 8 行代码是顺序执行(不会交错)没有问题。但多线程下这 8 行代码可能交错运行:
涉及到的概念说明:
临界区( Critical Section)【一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区,其共享资源为临界资源】
- 一个程序运行多个线程本身是没有问题的
- 问题出在多个线程访问共享资源
- 多个线程读共享资源其实也没有问题
- 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
竞态条件( Race Condition )【多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件】
为了避免临界区的竞态条件发生,有多种手段可以达到目的:
- 阻塞式的解决方案:synchronized,Lock
- 非阻塞式的解决方案:原子变量
注意:
虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
- 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
- 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点
synchronized的使用
说明
synchronized 同步块是 Java 提供的一种原子性内置锁,,Java 中的每个对象都可以把它当作 一个同步锁来使用,这些 Java 内置的使用者看不到的锁被称为内置锁,也叫作监视器锁。
加锁方式展示
解决之前的共享问题
代码展示
public static synchronized void increment() { counter++; } //临界区 public static synchronized void decrement() { counter--; } //临界区
图解说明
synchronized 实际是用对象锁保证了临界区内代码的原子性:
synchronized底层原理
synchronized说明
- synchronized是JVM内置锁,基于Monitor机制实现,依赖底层操作系统的互斥原语Mutex(互斥量),它是一个重量级锁,性能较低。当然,JVM内置锁在1.5之后版本做了重大的优化,如锁粗化(Lock Coarsening)、锁消除(Lock Elimination)、轻量级锁(Lightweight Locking)、偏向锁(Biased Locking)、自适应自旋(Adaptive Spinning)等技术来减少锁操作的开销,内置锁的并发性能已经基本与Lock持平。
- Java虚拟机通过一个同步结构支持方法和方法中的指令序列的同步:monitor。
- 同步方法是通过方法中的access_flags中设置ACC_SYNCHRONIZED标志来实现;同步代码块是通过monitorenter和monitorexit来实现。两个指令的执行是JVM通过调用操作系统的互斥原语mutex来实现,被阻塞的线程会被挂起、等待重新调度,会导致“用户态和内核态”两个态之间来回切换,对性能有较大影响。
查看synchronized的字节码指令序列
代码展示
private static String lock = ""; public static void increment() { synchronized (lock){ counter++; } } public static synchronized void decrement() { counter--; }
代码说明(在idea中安装一个jclasslib插件,用于分析字节码)
针对decrement()方法分析,图示
说明
图一展示了 i--的JVM 字节码指令,没有明显的加锁现象
图二展示了方法的访问标志(access_flags)为 0x0029,这个是怎么得来的,结合图三的访问标志表可得
public+ static+ synchronized = 0x0001 +0x0008+ 0x0020 = 0x0029
针对increment()方法分析,图示
说明
图一的展示,充分说明了如果synchronized关键字加载方法上面则是通过访问标志来设置锁的,字节码不怎么改动
图二展示了,在方法内部代码加锁在字节码层面上是通过(monitorenter+monitorexit 来进行实现的)
至于为什么会有两个monitorexit ,一个是正常退出,一个是异常退出。与我们代码中加锁解锁的方式很类似(抛出异常,然后最终环节还是要解锁的):
lock.lock(); try{ counter++; }finally { lock.unlock(); }
拓展部分(此外它还有一个手动的API),手动加锁与解锁,不过已经被废弃了【虽然说还可以用,但指不定哪天就没了】
UnsafeUtils.getUnsafe().monitorEnter(lock); try{ counter++; }finally { UnsafeUtils.getUnsafe().monitorExit(lock); }
Java语言的内置管程synchronized详解(具体可看 Monitor(管程/监视器)详解)
Java 参考了 MESA 模型,语言内置的管程(synchronized)对 MESA 模型进行了精简。MESA模型中,条件变量可以有多个,Java 语言内置的管程里只有一个条件变量。模型如下图所示:
问题:synchronized加锁加在对象上,锁对象是如何记录锁状态的?
首先要了解对象的内存布局(可查看 对象的内存布局解析)
其次,锁状态被记录在每个对象的对象头的Mark Word中。
1.Mark Word是如何记录锁状态的
(1)Hotspot通过markOop类型实现Mark Word,具体实现位于markOop.hpp文件中。由于对象需要存储的运行时数据很多,考虑到虚拟机的内存使用,markOop被设计成一个非固定的数据结构,以便在极小的空间存储尽量多的数据,根据对象的状态复用自己的存储空间。
(2)简单点理解就是:MarkWord 结构搞得这么复杂,是因为需要节省内存,让同一个内存区域在不同阶段有不同的用处。
2.Mark Word的结构
(1)32位JVM下的对象结构描述
(2)64位JVM下的对象结构描述
(3)结构说明
- hash: 保存对象的哈希码。运行期间调用System.identityHashCode()来计算,延迟计算,并把结果赋值到这里。
- age: 保存对象的分代年龄。表示对象被GC的次数,当该次数到达阈值的时候,对象就会转移到老年代。
- biased_lock: 偏向锁标识位。由于无锁和偏向锁的锁标识都是 01,没办法区分,这里引入一位的偏向锁标识位。
- lock: 锁状态标识位。区分锁状态,比如11时表示对象待GC回收状态, 只有最后2位锁标识(11)有效。
- JavaThread*: 保存持有偏向锁的线程ID。偏向模式的时候,当某个线程持有对象的时候,对象这里就会被置为该线程的ID。 在后面的操作中,就无需再进行尝试获取锁的动作。这个线程ID并不是JVM分配的线程ID号,和Java Thread中的ID是两个概念。
- epoch: 保存偏向时间戳。偏向锁在CAS锁操作过程中,偏向性标识,表示对象更偏向哪个锁。
3.Mark Word中锁标记枚举
代码展示
enum { locked_value = 0, //00 轻量级锁 unlocked_value = 1, //001 无锁 monitor_value = 2, //10 监视器锁,也叫膨胀锁,也叫重量级锁 marked_value = 3, //11 GC标记 biased_lock_pattern = 5 //101 偏向锁 };
更直观的理解方式:
跟踪锁标记变化(验证理论)
偏向锁
(1)概念说明
1.偏向锁是一种针对加锁操作的优化手段,经过研究发现,在大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,因此为了消除数据在无竞争情况下锁重入(CAS操作)的开销而引入偏向锁。对于没有锁竞争的场合,偏向锁有很好的优化效果。
2.当JVM启用了偏向锁模式(jdk6默认开启),新创建对象的Mark Word中的Thread Id为0,说明此时处于可偏向但未偏向任何线程,也叫做匿名偏向状态(anonymously biased)。
(2)偏向锁延迟偏向
1.偏向锁模式存在偏向锁延迟机制:
HotSpot 虚拟机在启动后有个 4s 的延迟才会对每个新建的对象开启偏向锁模式。
JVM启动时会进行一系列的复杂活动,比如装载配置,系统类初始化等等。
在这个过程中会使用大量synchronized关键字对对象加锁,且这些锁大多数都不是偏向锁。
为了减少初始化时间,JVM默认延时加载偏向锁。
2.偏向锁的JVM指令:
//关闭延迟开启偏向锁 ‐XX:BiasedLockingStartupDelay=0 //禁止偏向锁 ‐XX:‐UseBiasedLocking //启用偏向锁 ‐XX:+UseBiasedLocking
3.验证偏向锁的延迟机制:
代码展示
public class LockEscalationDemo { public static void main(String[] args) throws InterruptedException { log.debug(ClassLayout.parseInstance(new Object()).toPrintable()); //HotSpot 虚拟机在启动后有个 4s 的延迟才会对每个新建的对象开启偏向锁模式 Thread.sleep(5000); Object obj = new Object(); log.debug(ClassLayout.parseInstance(obj).toPrintable()); } }
结果展示
说明:
明显的可以看出,创建的对象在一定时间后会开启由无锁转变为偏向锁的模式。而且这一定的时间可以通过JVM指令设置。
(3)偏向锁状态跟踪
情况1:对创建的Object对象加锁
代码展示
public class ObjectTest { public static void main(String[] args) throws InterruptedException { //jvm延迟偏向 Thread.sleep(5000); Object obj = new Test(); System.out.println(ClassLayout.parseInstance(obj).toPrintable()); new Thread(()->{ synchronized (obj){ System.out.println(Thread.currentThread().getName()+"加锁\n"+ClassLayout.parseInstance(obj).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(obj).toPrintable()); },"Thread1").start(); } }
结果展示
情况2:对类对象进行加锁(这种一般指在开启偏向锁模式前,就已经创建的类对象,否则就是第一种情况了,毕竟4秒的延迟)
代码展示(这里面的延迟偏向时间其实不是必要的,只是为了效果,因为类在jvm启动的时候就会去加载产生类对象,故没有偏向)
public class ObjectTest { public static void main(String[] args) throws InterruptedException { //jvm延迟偏向 Thread.sleep(5000); System.out.println(ClassLayout.parseInstance(ObjectTest.class).toPrintable()); new Thread(()->{ synchronized (ObjectTest.class){ System.out.println(Thread.currentThread().getName()+"加锁\n"+ClassLayout.parseInstance(ObjectTest.class).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(ObjectTest.class).toPrintable()); },"Thread1").start(); } }
结果展示
(4)偏向锁撤销【偏向锁的撤销要到GC的安全点(因为代码是由指令构成的,所谓安全点就是指某段代码被完整执行,如i++,由四条指令构成,如果只执行了两条,这就是不安全)】
偏向锁撤销之调用对象HashCode
1.造成锁撤销的说明
调用锁对象的obj.hashCode()或System.identityHashCode(obj)方法会导致该对象的偏向锁被撤销。
因为对于一个对象,其HashCode只会生成一次并保存,偏向锁是没有地方保存hashcode的。(可往上看 Mark Word的结构)
- 轻量级锁会在锁记录中记录 hashCode
- 重量级锁会在 Monitor 中记录 hashCode
当对象处于可偏向(也就是线程ID为0)和已偏向的状态下,调用HashCode计算将会使对象再也无法偏向:
- 当对象可偏向时,MarkWord将变成未锁定状态,并只能升级成轻量锁;
- 当对象正处于偏向锁时,调用HashCode将使偏向锁强制升级成重量锁。
2.验证说明
当对象正处于偏向锁时,调用HashCode将使偏向锁强制升级成重量锁:
代码展示
public static void main(String[] args) throws InterruptedException { //jvm延迟偏向 Thread.sleep(5000); Object obj = new Test(); System.out.println(ClassLayout.parseInstance(obj).toPrintable()); new Thread(()->{ synchronized (obj){ System.out.println(Thread.currentThread().getName()+"加锁中,hashCode前\n"+ClassLayout.parseInstance(obj).toPrintable()); obj.hashCode(); System.out.println(Thread.currentThread().getName()+"加锁中,hashCode后\n"+ClassLayout.parseInstance(obj).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(obj).toPrintable()); },"Thread1").start(); }
结果展示
当对象可偏向时,MarkWord将变成未锁定状态,并只能升级成轻量锁:
代码展示
public class ObjectTest { public static void main(String[] args) throws InterruptedException { //jvm延迟偏向 Thread.sleep(5000); Object obj = new Test(); System.out.println(ClassLayout.parseInstance(obj).toPrintable()); obj.hashCode(); System.out.println(ClassLayout.parseInstance(obj).toPrintable()); new Thread(()->{ synchronized (obj){ System.out.println(Thread.currentThread().getName()+"加锁\n"+ClassLayout.parseInstance(obj).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(obj).toPrintable()); },"Thread1").start(); } }
结果展示
偏向锁撤销之调用wait/notify(这个就自行去验证吧)
结论:偏向锁状态执行obj.notify() 会升级为轻量级锁,调用obj.wait(timeout) 会升级为重量级锁。
轻量级锁
(1)概念说明
倘若偏向锁失败,虚拟机并不会立即升级为重量级锁,它还会尝试使用一种称为轻量级锁的优化手段,此时Mark Word 的结构也变为轻量级锁的结构。轻量级锁所适应的场景是线程交替执行同步块的场合,如果存在同一时间多个线程访问同一把锁的场合,就会导致轻量级锁膨胀为重量级锁。
汇总:
- 偏向锁适用于单个线程,以偏向标记表明已经加锁(本质上没有锁,只是标记),加上记录了偏向的线程(作用是认为该线程下次还会执行加锁中的逻辑,这样下次还加锁的话,就不需要怎么改动。如果下次不是该线程,则会进行重偏向,有种使用内存缓存的思维,觉得这次用了下次还会用,干脆记录下来,下次省事点的感觉)
- 轻量级锁适用于线程交替执行同步块的场合(指竞争不大,如A线程持有偏向标记,B来获取锁,获取不到偏向标记,改用CAS方式加锁【这个过程需要一点点时间,这个时间是为了让A线程能刚好完成任务,那么B线程就能加锁成功获得轻量级锁,否则就会将锁变为重量级锁,让A持有直到释放,然后B再获取】)
- 重量级锁作用于竞争很大的场景,一般是持有对象持有锁时间很长导致其他需要锁的线程被迫要进入等待的场景,或者是很多线程同时抢锁,一部分线程拿不到锁不得不等待的场景。
- 偏向锁适用于单个线程,以偏向标记表明已经加锁(本质上没有锁,只是标记),加上记录了偏向的线程(作用是认为该线程下次还会执行加锁中的逻辑,这样下次还加锁的话,就不需要怎么改动。如果下次不是该线程,则会进行重偏向,有种使用内存缓存的思维,觉得这次用了下次还会用,干脆记录下来,下次省事点的感觉)
(2)疑难问题
1.在锁变化中容易存在哪些误区?(后面有源码分析,大家可以对照一下)
(1)无锁--》偏向锁--》轻量级锁--》重量级锁
其实不存在无锁--》偏向锁 这一步。其次,无锁和偏向锁,都是JVM创建的时候给予打上标记的,故算是并列对等的。
而且无锁和偏向锁升级到重量级,必然会经过轻量级锁的代码流程。
再者偏向锁和轻量级锁都属于用户态的加锁,而重量级锁属于内核态部分,所以才会比较耗性能,也就往往希望加锁能在用户态。(这也是ReentrantLock更加常用的原因)
(2)轻量级锁自旋获取锁失败,会膨胀升级为重量级锁
首先看过源码的都知道,轻量级锁是不存在自旋的,在那段代码里面有的只是进行了一次CAS加锁操作。
其次是,自旋是在生成Monitor对象的过程中,因为这个对象的生成过程较为复杂,所以这个过程一有空闲就会尝试去进行CAS加锁,如果能够成功则加上轻量级锁,Monitor对象也就不必生成了。因为Monitor对象生成后锁对象会被塞入其中,这样锁对象的标记就会变为重量级锁标记了。
(3)重量级锁不存在自旋(如上所说,重量级锁的自旋式在生成Monitor对象的过程中)
2.有时候会发现重偏向的时候,线程ID没有改变的原因?
原因是JVM层面做了优化,将操作系统的线程直接赋予到新的线程上面(避免操作系统先销毁,再去新建)
JVM层面建立的线程是依托于操作系统建立的线程,JVM的线程指向操作系统的线程,去做一些操作,如果JVM层面的线程要销毁了,同时又需要建立一些线程,这时候操作系统不一定会将老旧的线程进行销毁,而是分配给新JVM层面的线程。(详情可查看 深入理解Java线程 )
3.轻量级锁是否可以降级为偏向锁?
不可以,不管轻量级锁还是重量级锁,释放锁后,都是会变成无锁状态,因为偏向状态被撤销了。
锁升级场景
情况1,偏向锁升级为轻量级锁:
代码示例
public static void main(String[] args) throws InterruptedException { //HotSpot 虚拟机在启动后有个 4s 的延迟才会对每个新建的对象开启偏向锁模式 Thread.sleep(5000); Object obj = new Object(); new Thread(new Runnable() { @Override public void run() { log.debug(Thread.currentThread().getName()+"开始执行。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ log.debug(Thread.currentThread().getName()+"获取锁执行中。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } log.debug(Thread.currentThread().getName()+"释放锁。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } },"thread1").start(); //控制线程竞争时机 Thread.sleep(1); new Thread(new Runnable() { @Override public void run() { log.debug(Thread.currentThread().getName()+"开始执行。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ log.debug(Thread.currentThread().getName()+"获取锁执行中。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } log.debug(Thread.currentThread().getName()+"释放锁。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } },"thread2").start(); }
结果展示
结果说明
线程1明显经历了偏向锁的加锁到释放锁的步骤,而线程2在加锁前,可以看到对象锁还在偏向状态中,此时加锁,偏向锁状态未能重偏向,进入到了CAS的轻量级锁的加锁步骤,而且加锁成功。释放后变回无锁状态。
情况2,轻量级锁升级为重量级锁:
代码示例
public static void main(String[] args) throws InterruptedException { //HotSpot 虚拟机在启动后有个 4s 的延迟才会对每个新建的对象开启偏向锁模式 Thread.sleep(5000); Object obj = new Object(); new Thread(new Runnable() { @Override public void run() { log.debug(Thread.currentThread().getName()+"开始执行。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ log.debug(Thread.currentThread().getName()+"获取锁执行中。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } log.debug(Thread.currentThread().getName()+"释放锁。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } },"thread1").start(); //控制线程竞争时机 Thread.sleep(1); new Thread(new Runnable() { @Override public void run() { log.debug(Thread.currentThread().getName()+"开始执行。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ log.debug(Thread.currentThread().getName()+"获取锁执行中。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } log.debug(Thread.currentThread().getName()+"释放锁。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } },"thread2").start(); new Thread(new Runnable() { @Override public void run() { log.debug(Thread.currentThread().getName()+"开始执行。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ log.debug(Thread.currentThread().getName()+"获取锁执行中。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug(Thread.currentThread().getName()+"释放锁。。。\n" +ClassLayout.parseInstance(obj).toPrintable()); } },"thread3").start(); }
结果说明
其实这一步比较简单就是,当锁对象已经是轻量锁了,这时候再遇到竞争的情况。
情况3,偏向锁升级为重量级锁:
代码示例
public static void main(String[] args) throws InterruptedException { //jvm延迟偏向 Thread.sleep(5000); Object obj = new Test(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"加锁前\n"+ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ System.out.println(Thread.currentThread().getName()+"加锁中1\n"+ClassLayout.parseInstance(obj).toPrintable()); try { Thread.sleep(20000); //模拟业务时间 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+"加锁中2\n"+ClassLayout.parseInstance(obj).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(obj).toPrintable()); },"Thread1").start(); Thread.sleep(8000); //模拟时间间隔 new Thread(()->{ System.out.println(Thread.currentThread().getName()+"加锁前\n"+ClassLayout.parseInstance(obj).toPrintable()); synchronized (obj){ System.out.println(Thread.currentThread().getName()+"加锁中\n"+ClassLayout.parseInstance(obj).toPrintable()); } System.out.println(Thread.currentThread().getName()+"释放锁\n"+ClassLayout.parseInstance(obj).toPrintable()); },"Thread2").start(); }
结果展示
结果说明
线程1,在偏向锁状态中执行业务,线程2进行加锁,此时明显线程2会直接阻塞住(明显是偏向锁加锁不行,轻量级锁加锁也不成功,直接转而到重量级锁步骤,去生成Monitor对象,进入队列中等待),而线程1中的锁会急速膨胀(被塞到Monitor对象里面了,成为重量级锁),然后线程1释放锁后线程2才会获取锁进行下一步。
总结:锁对象状态转换,图示:
锁升级的原理分析(深入hotspot的源码查找)
注:看源码更多的是验证理论的正确性,说会记住多少其实都是假的,只能说大概有印象,更多的是对自己总结的理论具备支持的论据,而不是没有底气的说个大概。
1.下载openjdk的源码包
2.在目录openjdk\hotspot\src\share\vm\runtime下找到synchronizer.cpp文件
代码展示
偏向锁的代码逻辑:
/* 偏向锁的获取由BiasedLocking::revoke_and_rebias方法实现 1、通过markOop mark = obj->mark()获取对象的markOop数据mark,即对象头的Mark Word; 2、判断mark是否为可偏向状态,即mark的偏向锁标志位为 1,锁标志位为 01; 3、判断mark中JavaThread的状态:如果为空,则进入步骤(4); 如果指向当前线程,则执行同步代码块;如果指向其它线程,进入步骤(5); 4、通过CAS原子指令设置mark中JavaThread为当前线程ID,如果执行CAS成功, 则执行同步代码块,否则进入步骤(5); 5、如果执行CAS失败,表示当前存在多个线程竞争锁,当达到全局安全点(safepoint), 获得偏向锁的线程被挂起,撤销偏向锁,并升级为轻量级, 升级完成后被阻塞在安全点的线程继续执行同步代码块; 偏向锁的撤销由BiasedLocking::revoke_at_safepoint方法实现 1、偏向锁的撤销动作必须等待全局安全点; 2、暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态; 3、撤销偏向锁,恢复到无锁(标志位为 01)或轻量级锁(标志位为 00)的状态 jdk1.6之后默认开启偏向锁 开启偏向锁:-XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0(默认有延迟,关闭延迟) 关闭偏向锁 XX:-UseBiasedLocking */ void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) { //判断是否开启偏向锁 if (UseBiasedLocking) { //判断是否不在全局安全点 if (!SafepointSynchronize::is_at_safepoint()) { //撤销和重偏向 BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD); //如果是撤销和重偏向状态直接返回 if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) { return; } } else { assert(!attempt_rebias, "can not rebias toward VM thread"); // 偏向锁的撤销 只有当其它线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁 BiasedLocking::revoke_at_safepoint(obj); } assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now"); } // 获取轻量级锁 当关闭偏向锁功能,或多个线程竞争偏向锁导致偏向锁升级为轻量级锁 slow_enter (obj, lock, THREAD) ; }
轻量级锁的代码逻辑:
进入:
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) { // 获取对象的markOop数据 mark markOop mark = obj->mark(); assert(!mark->has_bias_pattern(), "should not see bias pattern here"); //判断mark是否为无锁状态:mark的偏向锁标志位为 0,锁标志位为 01; if (mark->is_neutral()) { //把mark保存到BasicLock对象的_displaced_header字段 lock->set_displaced_header(mark); //通过CAS尝试将Mark Word更新为指向BasicLock对象的指针,如果更新成功,表示竞争到锁,则执行同步代码 // Atomic::cmpxchg_ptr原子操作保证只有一个线程可以把指向栈帧的指针复制到Mark Word if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { TEVENT (slow_enter: release stacklock) ; return ; } } else //如果当前mark处于加锁状态,且mark中的ptr指针指向当前线程的栈帧,表示为重入操作,不需要竞争锁 if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; } if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) { lock->set_displaced_header (NULL) ; return ; } // 这时候需要膨胀为重量级锁,膨胀前,设置Displaced Mark Word为一个特殊值,代表该锁正在用一个重量级锁的monitor lock->set_displaced_header(markOopDesc::unused_mark()); // 锁膨胀的过程,该方法返回一个ObjectMonitor对象,然后调用其enter方法 ObjectSynchronizer::inflate(2, obj())->enter(THREAD); }
释放锁:
// 轻量级锁释放 void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) { fast_exit (object, lock, THREAD) ; } //轻量级锁的释放 void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) { assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here"); // 取出栈帧中保存的mark word markOop dhw = lock->displaced_header(); markOop mark ; if (dhw == NULL) { //如果是null,说明是重入的 mark = object->mark() ; assert (!mark->is_neutral(), "invariant") ; if (mark->has_locker() && mark != markOopDesc::INFLATING()) { assert(THREAD->is_lock_owned((address)mark->locker()), "invariant") ; } if (mark->has_monitor()) { ObjectMonitor * m = mark->monitor() ; assert(((oop)(m->object()))->mark() == mark, "invariant") ; assert(m->is_entered(THREAD), "invariant") ; } return ; } //获取当前锁对象的mark word mark = object->mark() ; if (mark == (markOop) lock) { assert (dhw->is_neutral(), "invariant") ; // 通过CAS尝试把dhw替换到当前的Mark Word,如果CAS成功,说明成功的释放了锁 if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) { TEVENT (fast_exit: release stacklock) ; return; } } // CAS失败,此时有竞争,开始膨胀 ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ; }
锁膨胀代码逻辑:
//锁膨胀过程 膨胀完成返回monitor时,并不表示该线程竞争到了锁, //真正的锁竞争发生在ObjectMonitor::enter方法中 ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self, oop object) { assert (Universe::verify_in_progress() || !SafepointSynchronize::is_at_safepoint(), "invariant") ; for (;;) { const markOop mark = object->mark() ; assert (!mark->has_bias_pattern(), "invariant") ; //mark是以下状态中的一种: // * Inflated(重量级锁状态) - 直接返回 // * Stack-locked(轻量级锁状态) - 膨胀 // * INFLATING(膨胀中) - 忙等待直到膨胀完成 // * Neutral(无锁状态) - 膨胀 // * BIASED(偏向锁) - 非法状态,在这里不会出现 // CASE: inflated // 判断当前是否为重量级锁状态,即Mark Word的锁标识位为 10,如果已经是重量级锁状态直接返回 if (mark->has_monitor()) { //获取指向ObjectMonitor的指针,并返回,膨胀过程已经完成 ObjectMonitor * inf = mark->monitor() ; assert (inf->header()->is_neutral(), "invariant"); assert (inf->object() == object, "invariant") ; assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid"); return inf ; } /* 如果当前锁处于膨胀中,说明该锁正在被其它线程执行膨胀操作, 则当前线程就进行自旋等待锁膨胀完成,这里需要注意一点,虽然是自旋操作, 但不会一直占用cpu资源,每隔一段时间会通过os::NakedYield方法放弃cpu资源, 或通过park方法挂起;如果其他线程完成锁的膨胀操作,则退出自旋并返回 */ if (mark == markOopDesc::INFLATING()) { TEVENT (Inflate: spin while INFLATING) ; // 检查是否处于膨胀中状态(其他线程正在膨胀中), //如果是膨胀中,就调用ReadStableMark方法进行等待, //ReadStableMark方法执行完毕后再通过continue继续检查, //ReadStableMark方法中还会调用os::NakedYield()释放CPU资源 ReadStableMark(object) ; continue ; } /* 如果当前是轻量级锁状态,即锁标识位为 00 ,开始膨胀 1、通过omAlloc方法,获取一个可用的ObjectMonitor monitor,并重置monitor数据; 2、通过CAS尝试将Mark Word设置为markOopDesc:INFLATING,标识当前锁正在膨胀中, 如果CAS失败,说明同一时刻其它线程已经将Mark Word设置为markOopDesc:INFLATING, 当前线程进行自旋等待膨胀完成; 3、如果CAS成功,设置monitor的各个字段:_header、_owner和_object等,并返回 */ if (mark->has_locker()) { // 当前轻量级锁状态,创建ObjectMonitor对象,并初始化 ObjectMonitor * m = omAlloc (Self) ; m->Recycle(); m->_Responsible = NULL ; m->OwnerIsThread = 0 ; m->_recursions = 0 ; m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class // 设置状态为膨胀中 markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ; if (cmp != mark) { //CAS失败,说明冲突了,自旋等待 omRelease (Self, m, true) ; //释放monitor continue ; // Interference -- just retry } markOop dmw = mark->displaced_mark_helper() ; assert (dmw->is_neutral(), "invariant") ; //CAS成功,设置ObjectMonitor的_header、_owner和_object等 m->set_header(dmw) ; m->set_owner(mark->locker()); m->set_object(object); // TODO-FIXME: assert BasicLock->dhw != 0. guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ; // 将锁对象的mark word设置为重量级锁状态 object->release_set_mark(markOopDesc::encode(m)); if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; TEVENT(Inflate: overwrite stacklock) ; if (TraceMonitorInflation) { if (object->is_instance()) { ResourceMark rm; tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", (void *) object, (intptr_t) object->mark(), object->klass()->external_name()); } } return m ; } // 如果是无锁状态 assert (mark->is_neutral(), "invariant"); // 创建ObjectMonitor对象,并初始化 ObjectMonitor * m = omAlloc (Self) ; // prepare m for installation - set monitor to initial state m->Recycle(); m->set_header(mark); m->set_owner(NULL); m->set_object(object); m->OwnerIsThread = 1 ; m->_recursions = 0 ; m->_Responsible = NULL ; m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class // CAS 设置对象头标志为重量级锁 if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) { // 有竞争CAS失败,释放monitor重试 m->set_object (NULL) ; m->set_owner (NULL) ; m->OwnerIsThread = 0 ; m->Recycle() ; omRelease (Self, m, true) ; m = NULL ; continue ; } if (ObjectMonitor::_sync_Inflations != NULL) ObjectMonitor::_sync_Inflations->inc() ; TEVENT(Inflate: overwrite neutral) ; if (TraceMonitorInflation) { if (object->is_instance()) { ResourceMark rm; tty->print_cr("Inflating object " INTPTR_FORMAT " , mark " INTPTR_FORMAT " , type %s", (void *) object, (intptr_t) object->mark(), object->klass()->external_name()); } } return m ; } }
重量级锁的代码逻辑(在文件objectMonitor.cpp里面):
进入:
// 重量级锁执行逻辑 void ATTR ObjectMonitor::enter(TRAPS) { Thread * const Self = THREAD ; void * cur ; //通过CAS尝试把monitor的_owner字段设置为当前线程,直接获取锁 cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ; if (cur == NULL) { // Either ASSERT _recursions == 0 or explicitly set _recursions = 0. assert (_recursions == 0 , "invariant") ; assert (_owner == Self, "invariant") ; // CONSIDER: set or assert OwnerIsThread == 1 return ; } //设置之前的_owner指向当前线程,说明当前线程已经持有锁,此次为重入,_recursions自增 if (cur == Self) { // TODO-FIXME: check for integer overflow! BUGID 6557169. _recursions ++ ; return ; } //当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀且第一次调用enter方法,那cur是指向Lock Record的指针 //设置_recursions为1,_owner为当前线程,该线程成功获得锁并返回 if (Self->is_lock_owned ((address)cur)) { assert (_recursions == 0, "internal state error"); _recursions = 1 ; // Commute owner from a thread-specific on-stack BasicLockObject address to a full-fledged "Thread *". _owner = Self ; OwnerIsThread = 1 ; return ; } // We've encountered genuine contention. assert (Self->_Stalled == 0, "invariant") ; Self->_Stalled = intptr_t(this) ; // 调用系统同步操作之前,先尝试自旋获得锁 if (Knob_SpinEarly && TrySpin (Self) > 0) { assert (_owner == Self , "invariant") ; assert (_recursions == 0 , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; //自旋的过程中获得了锁,则直接返回 Self->_Stalled = 0 ; return ; } assert (_owner != Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (Self->is_Java_thread() , "invariant") ; JavaThread * jt = (JavaThread *) Self ; assert (!SafepointSynchronize::is_at_safepoint(), "invariant") ; assert (jt->thread_state() != _thread_blocked , "invariant") ; assert (this->object() != NULL , "invariant") ; assert (_count >= 0, "invariant") ; Atomic::inc_ptr(&_count); EventJavaMonitorEnter event; { // Change java thread status to indicate blocked on monitor enter. JavaThreadBlockedOnMonitorEnterState jtbmes(jt, this); DTRACE_MONITOR_PROBE(contended__enter, this, object(), jt); if (JvmtiExport::should_post_monitor_contended_enter()) { JvmtiExport::post_monitor_contended_enter(jt, this); } OSThreadContendState osts(Self->osthread()); ThreadBlockInVM tbivm(jt); Self->set_current_pending_monitor(this); // TODO-FIXME: change the following for(;;) loop to straight-line code. for (;;) { jt->set_suspend_equivalent(); // monitor竞争失败的线程,等待获取锁 // 1. 将当前线程封装为 node 塞到队列 cxq 的队头 // 2. 调用 park 挂起当前线程 // 3. 被唤醒后再次尝试获取锁(在唤醒时候会根据不同的唤醒策略定义 cxq 与 EntryList 的优先级) EnterI (THREAD) ; if (!ExitSuspendEquivalent(jt)) break ; _recursions = 0 ; _succ = NULL ; exit (false, Self) ; jt->java_suspend_self(); } Self->set_current_pending_monitor(NULL); } Atomic::dec_ptr(&_count); assert (_count >= 0, "invariant") ; Self->_Stalled = 0 ; // Must either set _recursions = 0 or ASSERT _recursions == 0. assert (_recursions == 0 , "invariant") ; assert (_owner == Self , "invariant") ; assert (_succ != Self , "invariant") ; assert (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ; DTRACE_MONITOR_PROBE(contended__entered, this, object(), jt); if (JvmtiExport::should_post_monitor_contended_entered()) { JvmtiExport::post_monitor_contended_entered(jt, this); } if (event.should_commit()) { event.set_klass(((oop)this->object())->klass()); event.set_previousOwner((TYPE_JAVALANGTHREAD)_previous_owner_tid); event.set_address((TYPE_ADDRESS)(uintptr_t)(this->object_addr())); event.commit(); } if (ObjectMonitor::_sync_ContendedLockAttempts != NULL) { ObjectMonitor::_sync_ContendedLockAttempts->inc() ; } }
释放锁(这部分代码其实有助于作证java的管程模型):
//重量级锁解锁逻辑 void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) { Thread * Self = THREAD ; // 如果_owner不是当前线程 if (THREAD != _owner) { // 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀后还没调用过enter方法,_owner会是指向Lock Record的指针。 if (THREAD->is_lock_owned((address) _owner)) { // 如果owner位于当前线程调用栈帧,说明该锁是轻量级锁膨胀来的 assert (_recursions == 0, "invariant") ; //修改owner属性 _owner = THREAD ; _recursions = 0 ; OwnerIsThread = 1 ; } else { // 其他线程占用该锁,直接返回 TEVENT (Exit - Throw IMSX) ; assert(false, "Non-balanced monitor enter/exit!"); if (false) { THROW(vmSymbols::java_lang_IllegalMonitorStateException()); } return; } } // 如果重入计数器不为0.减1后返回 if (_recursions != 0) { _recursions--; // this is simple recursive enter TEVENT (Inflated exit - recursive) ; return ; } // _Responsible设置为null if ((SyncFlags & 4) == 0) { _Responsible = NULL ; } #if INCLUDE_TRACE if (not_suspended && Tracing::is_event_enabled(TraceJavaMonitorEnterEvent)) { _previous_owner_tid = SharedRuntime::get_java_tid(Self); } #endif for (;;) { assert (THREAD == _owner, "invariant") ; // Knob_ExitPolicy默认为0 if (Knob_ExitPolicy == 0) { // 将_owner属性置为NULL,释放锁,如果某个线程正在自旋抢占该锁,则会抢占成功, //这种策略会优先保证通过自旋抢占锁的线程获取锁,而其他处于等待队列中的线程则靠后 OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock //让修改立即生效 OrderAccess::storeload() ; // See if we need to wake a successor // 如果_EntryList或者cxq都是空的,则直接返回 if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) { TEVENT (Inflated exit - simple egress) ; return ; } TEVENT (Inflated exit - complex egress) ; // 如果_EntryList或者cxq不是空的,则通过CAS设置owner属性为当前线程,尝试抢占锁 if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) { //抢占失败则返回,等占用该锁的线程释放后再处理队列中的等待线程 return ; } TEVENT (Exit - Reacquired) ; } else { if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) { OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock OrderAccess::storeload() ; // Ratify the previously observed values. if (_cxq == NULL || _succ != NULL) { TEVENT (Inflated exit - simple egress) ; return ; } //有可能cxq插入了一个新节点,导致上面的if不成立,需要重新获取锁 if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) { TEVENT (Inflated exit - reacquired succeeded) ; return ; } TEVENT (Inflated exit - reacquired failed) ; } else { //如果_EntryList或者cxq不是空的则不释放锁,避免二次抢占锁,即优先处理等待队列中的线程 TEVENT (Inflated exit - complex egress) ; } } guarantee (_owner == THREAD, "invariant") ; ObjectWaiter * w = NULL ; //根据QMode的不同会有不同的唤醒策略,默认为0 int QMode = Knob_QMode ; // 从cxq或EntryList中获取头节点, // 通过ObjectMonitor::ExitEpilog方法唤醒该节点封装的线程, // 唤醒操作最终由unpark完成 if (QMode == 2 && _cxq != NULL) { // QMode == 2 : cxq has precedence over EntryList. // Try to directly wake a successor from the cxq. // If successful, the successor will need to unlink itself from cxq. // QMode == 2并且cxq队列不为空 : cxq中的线程有更高优先级,直接唤醒cxq的队首线程 w = _cxq ; assert (w != NULL, "invariant") ; assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ; //通过unpark唤醒cxq对应的线程,唤醒后会将cxq从队列中移除 ExitEpilog (Self, w) ; return ; } if (QMode == 3 && _cxq != NULL) { // 把_cxq队首元素放入_EntryList的尾部 w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; //将_cxq置为NULL,如果失败则更新w,重新尝试直到成功为止 //置为NULL后,如果有新的节点插入进来就形成了一个新的cxq队列 ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; //遍历cxq中的所有节点,将其置为TS_ENTER for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } ObjectWaiter * Tail ; //遍历_EntryList找到末尾元素,将w插入到后面 for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ; if (Tail == NULL) { _EntryList = w ; } else { Tail->_next = w ; //将w插入_EntryList队列尾部 w->_prev = Tail ; } } if (QMode == 4 && _cxq != NULL) { // 把_cxq队首元素放入_EntryList的头部 w = _cxq ; for (;;) { assert (w != NULL, "Invariant") ; //将_cxq置为NULL,如果失败则更新w,重新尝试直到成功为止 ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } assert (w != NULL , "invariant") ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; //遍历cxq中的所有节点,将其置为TS_ENTER for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } // Prepend the RATs to the EntryList //插入到_EntryList的头部 if (_EntryList != NULL) { q->_next = _EntryList ; _EntryList->_prev = q ; } _EntryList = w ; // Fall thru into code that tries to wake a successor from EntryList } w = _EntryList ; // _EntryList不为空,直接从_EntryList中唤醒线程 if (w != NULL) { assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; ////通过unpark唤醒w对应的线程,唤醒后会该线程会负责将w从EntryList链表中移除 ExitEpilog (Self, w) ; return ; } //如果_EntryList为空 w = _cxq ; if (w == NULL) continue ;//如果_cxq和_EntryList队列都为空,自旋 for (;;) { assert (w != NULL, "Invariant") ; //自旋再获得cxq首结点 ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ; if (u == w) break ; w = u ; } TEVENT (Inflated exit - drain cxq into EntryList) ; assert (w != NULL , "invariant") ; assert (_EntryList == NULL , "invariant") ; // cxq不为空,_EntryList为空的情况 if (QMode == 1) { //遍历cxq中的元素将其加入到_EntryList中,注意顺序跟cxq中是反的 ObjectWaiter * s = NULL ; ObjectWaiter * t = w ; ObjectWaiter * u = NULL ; while (t != NULL) { guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ; t->TState = ObjectWaiter::TS_ENTER ; u = t->_next ; t->_prev = u ; t->_next = s ; s = t; t = u ; } _EntryList = s ; //将_cxq中的元素转移到_EntryList,并反转顺序 assert (s != NULL, "invariant") ; } else { // QMode == 0 or QMode == 2 // 将_cxq中的元素转移到_EntryList _EntryList = w ; ObjectWaiter * q = NULL ; ObjectWaiter * p ; for (p = w ; p != NULL ; p = p->_next) { guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ; p->TState = ObjectWaiter::TS_ENTER ; p->_prev = q ; q = p ; } } if (_succ != NULL) continue; w = _EntryList ; if (w != NULL) { guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ; // 唤醒_EntryList队首元素 ExitEpilog (Self, w) ; return ; } } }
3.进行图解
Synchronized重量级锁加锁解锁执行逻辑:
Synchronized轻量级锁源码分析:
synchronized锁优化
偏向锁批量重偏向&批量撤销
概念说明
从偏向锁的加锁解锁过程中可看出,当只有一个线程反复进入同步块时,偏向锁带来的性能开销基本可以忽略,但是当有其他线程尝试获得锁时,就需要等到safe point时,再将偏向锁撤销为无锁状态或升级为轻量级,会消耗一定的性能,所以在多线程竞争频繁的情况下,偏向锁不仅不能提高性能,还会导致性能下降。于是,就有了批量重偏向与批量撤销的机制。
JVM的默认参数值:设置JVM参数-XX:+PrintFlagsFinal,在项目启动时即可输出JVM的默认参数值
intx BiasedLockingBulkRebiasThreshold = 20 //默认偏向锁批量重偏向阈值 intx BiasedLockingBulkRevokeThreshold = 40 //默认偏向锁批量撤销阈值
我们可以通过-XX:BiasedLockingBulkRebiasThreshold 和 -XX:BiasedLockingBulkRevokeThreshold 来手动设置阈值
原理
以class为单位,为每个class维护一个偏向锁撤销计数器,每一次该class的对象发生偏向撤销操作时,该计数器+1,当这个值达到重偏向阈值(默认20)时,JVM就认为该class的偏向锁有问题,因此会进行批量重偏向。
每个class对象会有一个对应的epoch字段,每个处于偏向锁状态对象的Mark Word中也有该字段,其初始值为创建该对象时class中的epoch的值。每次发生批量重偏向时,就将该值+1,同时遍历JVM中所有线程的栈,找到该class所有正处于加锁状态的偏向锁,将其epoch字段改为新值。下次获得锁时,发现当前对象的epoch值和class的epoch不相等,那就算当前已经偏向了其他线程,也不会执行撤销操作,而是直接通过CAS操作将其Mark Word的Thread Id 改成当前线程Id。
当达到重偏向阈值(默认20)后,假设该class计数器继续增长,当其达到批量撤销的阈值后(默认40),JVM就认为该class的使用场景存在多线程竞争,会标记该class为不可偏向,之后,对于该class的锁,直接走轻量级锁的逻辑。(注意:时间-XX:BiasedLockingDecayTime=25000ms范围内没有达到40次,撤销次数清为0,重新计时)
原理验证
验证代码展示
public class BiasedLockingTest { public static void main(String[] args) throws InterruptedException { //延时产生可偏向对象 Thread.sleep(5000); // 创建一个list,来存放锁对象 List<Object> list = new ArrayList<>(); // 线程1 new Thread(() -> { for (int i = 0; i < 50; i++) { // 新建锁对象 Object lock = new Object(); synchronized (lock) { list.add(lock); } } try { //为了防止JVM线程复用,在创建完对象后,保持线程thead1状态为存活 Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } }, "thead1").start(); //睡眠3s钟保证线程thead1创建对象完成 Thread.sleep(3000); log.debug("打印thead1,list中第20个对象的对象头:"); log.debug((ClassLayout.parseInstance(list.get(19)).toPrintable())); // 线程2 new Thread(() -> { for (int i = 0; i < 40; i++) { Object obj = list.get(i); synchronized (obj) { if(i>=15&&i<=21||i>=38){ log.debug("thread2-第" + (i + 1) + "次加锁执行中\t"+ ClassLayout.parseInstance(obj).toPrintable()); } } if(i==17||i==19){ log.debug("thread2-第" + (i + 1) + "次释放锁\t"+ ClassLayout.parseInstance(obj).toPrintable()); } } try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } }, "thead2").start(); Thread.sleep(3000); new Thread(() -> { for (int i = 0; i < 50; i++) { Object lock =list.get(i); if(i>=17&&i<=21||i>=35&&i<=41){ log.debug("thread3-第" + (i + 1) + "次准备加锁\t"+ ClassLayout.parseInstance(lock).toPrintable()); } synchronized (lock){ if(i>=17&&i<=21||i>=35&&i<=41){ log.debug("thread3-第" + (i + 1) + "次加锁执行中\t"+ ClassLayout.parseInstance(lock).toPrintable()); } } } },"thread3").start(); Thread.sleep(3000); log.debug("查看新创建的对象"); log.debug((ClassLayout.parseInstance(new Object()).toPrintable())); LockSupport.park(); } }
验证批量重偏向
说明
当撤销偏向锁阈值超过 20 次后,jvm 会觉得,是不是偏向错了,于是会在给这些对象加锁时重新偏向至加锁线程,重偏向会重置对象的Thread ID。
结果分析
thread1: 创建50个偏向线程thread1的偏向锁 1-50 偏向锁
thread2:
1-18 偏向锁撤销,升级为轻量级锁 (thread1释放锁之后为偏向锁状态)
19-40 偏向锁撤销达到阈值(20),执行了批量重偏向 (测试结果在第19就开始批量重偏向了)
验证批量撤销
说明
当撤销偏向锁阈值超过 40 次后,jvm 会认为不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。
注意:时间-XX:BiasedLockingDecayTime=25000ms范围内没有达到40次,撤销次数清为0, 重新计时。
结果分析
thread3:
1-18 从无锁状态直接获取轻量级锁 (thread2释放锁之后变为无锁状态)
19-40 偏向锁撤销,升级为轻量级锁 (thread2释放锁之后为偏向锁状态)
41-50 达到偏向锁撤销的阈值40,批量撤销偏向锁,升级为轻量级锁 (thread1释放锁之后为偏向锁状态)
新创建的对象: 无锁状态
应用场景
批量重偏向(bulk rebias)机制是为了解决:一个线程创建了大量对象并执行了初始的同步操作,后来另一个线程也来将这些对象作为锁对象进行操作,这样会导致大量的偏向锁撤销操作。
批量撤销(bulk revoke)机制是为了解决:在明显多线程竞争剧烈的场景下使用偏向锁是不合适的。
总结
- 批量重偏向和批量撤销是针对类的优化,和对象无关。
- 偏向锁重偏向一次之后不可再次重偏向。
- 当某个类已经触发批量撤销机制后,JVM会默认当前类产生了严重的问题,剥夺了该类的新实例对象使用偏向锁的权利
自旋优化
概念说明
重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。
- 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
- 在 Java 6 之后自旋是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,比较智能。
- Java 7 之后不能控制是否开启自旋功能
注意:自旋的目的是为了减少线程挂起的次数,尽量避免直接挂起线程(挂起操作涉及系统调用,存在用户态和内核态切换,这才是重量级锁最大的开销)。
锁粗化
概念说明
假设一系列的连续操作都会对同一个对象反复加锁及解锁,甚至加锁操作是出现在循环体中的,即使没有出现线程竞争,频繁地进行互斥同步操作也会导致不必要的性能损耗。如果JVM检测到有一连串零碎的操作都是对同一对象的加锁,将会扩大加锁同步的范围(即锁粗化)到整个操作序列的外部。
示例说明
代码展示
StringBuffer buffer = new StringBuffer(); /** * 锁粗化 */ public void append(){ buffer.append("aaa").append(" bbb").append(" ccc"); }
代码说明
上述代码每次调用 buffer.append 方法都需要加锁和解锁,如果JVM检测到有一连串的对同一个对象加锁和解锁的操作,就会将其合并成一次范围更大的加锁和解锁操作,即在第一次append方法时进行加锁,最后一次append方法结束后进行解锁。
锁消除
概念说明
锁消除即删除不必要的加锁操作。锁消除是Java虚拟机在JIT编译期间,通过对运行上下文的扫描,去除不可能存在共享资源竞争的锁,通过锁消除,可以节省毫无意义的请求锁时间。
示例说明
代码展示
/** * 锁消除 * -XX:+EliminateLocks 开启锁消除(jdk8默认开启) * -XX:-EliminateLocks 关闭锁消除 * @param str1 * @param str2 */ public void append(String str1, String str2) { StringBuffer stringBuffer = new StringBuffer(); stringBuffer.append(str1).append(str2); } public static void main(String[] args) throws InterruptedException { LockEliminationTest demo = new LockEliminationTest(); long start = System.currentTimeMillis(); for (int i = 0; i < 100000000; i++) { demo.append("aaa", "bbb"); } long end = System.currentTimeMillis(); System.out.println("执行时间:" + (end - start) + " ms"); }
代码说明
StringBuffer的append是个同步方法,但是append方法中的 StringBuffer 属于一个局部变量,不可能从该方法中逃逸出去,因此其实这过程是线程安全的,可以将锁消除。(涉及到了逃逸分析的概念,可查看 逃逸分析(Escape Analysis)详解)
这部分代码与锁粗化很相似,但却不同,StringBuffer定义在方法内是当做局部变量分配到了栈上,每个线程都会有自己的栈(JVM的内存模型的知识),故是私有的,不存在竞争,可以消除。而定义在方法外,则会分配到堆上,是共享的,所有线程都可以使用,故不能消除,但是在编译的时候检测到对同一个对象反复加锁及解锁,可以扩大加锁范围来达到减少加锁操作。
测试结果: 关闭锁消除执行时间4688 ms 开启锁消除执行时间:2601 ms(明显有则性能上的提升)。