PS:好累啊,好晚才到家,今天把学的并发编程的最后一点工具和概念总结下,明天正式进入aqs的源码学习~
一、原子操作CAS
1、什么是原子操作atomic operation?
所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (线程切换)。
2、java是如何实现原子操作?
1、使用synchronized对操作加锁
存在问题:
1、被阻塞的线程优先级很高
2、拿到锁的线程一直不释放锁怎么办?
3、大量的竞争,消耗cpu,同时带来死锁或者其他安全。
2、循环CAS(compare and swap)实现原子操作
Java中的CAS操作正是利用了处理器提供的CMPXCHG指令实现的。自旋CAS实现的基本思路就是循环进行CAS操作直到操作成功为止。
2.1、CAS的原理
CAS(Compare And Swap),指令级别保证这是一个原子操作
三个运算符: 一个内存地址V,一个期望的值A,一个新值B
基本思路:如果地址V上的值和期望的值A相等,就给地址V赋给新值B,如果不是,不做任何操作。循环(死循环,自旋)里不断的进行CAS操作
2.2、CAS的问题
1、ABA问题
就是一个县城可能将A改成了B,然后又有个线程将B又改成了A。但此时的A已经不是我们原本的A了。
就比如喝水,我到了一杯水,然后去上了个厕所,然后同事把我水喝了然后又给我接满了,等我回来时虽然桌子上还是一杯水,但此时已经不是我的那杯了。所以为了解决这个问题,我们可以对我们使用的地址通过加个版本号的概念,来标识我们的变量是否发生变化。
可使用AtomicStampedReference和AtomicMarkableReference记录版本号
2、开销问题
自旋还是很消耗性能的
3、只能保证一个共享变量的原子操作
2.3、Jdk中相关原子操作类的使用
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* @Auther: BlackKingW
* @Date: 2019/4/15 22:09
* @Description:带版本号的原子操作
*/
public class UseAtomicStampedReference {
static AtomicStampedReference<String> asr =
new AtomicStampedReference<>("BlackKingW",0);
public static void main(String[] args) throws InterruptedException {
final int oldStamp = asr.getStamp();//那初始的版本号
final String oldReferenc = asr.getReference();
System.out.println(oldReferenc+"==========="+oldStamp);
Thread rightStampThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+"当前变量值:"+oldReferenc+"当前版本戳:"+oldStamp+"-"
+asr.compareAndSet(oldReferenc, oldReferenc+"Java",
oldStamp, oldStamp+1));
}
});
Thread errorStampThread = new Thread(new Runnable() {
@Override
public void run() {
String reference = asr.getReference();
System.out.println(Thread.currentThread().getName()
+"当前变量值:"+reference+"当前版本戳:"+asr.getStamp()+"-"
+asr.compareAndSet(reference, reference+"C",
oldStamp, oldStamp+1));
}
});
rightStampThread.start();
rightStampThread.join();
errorStampThread.start();
errorStampThread.join();
System.out.println(asr.getReference()+"==========="+asr.getStamp());
}
}
二、显式锁
1、Lock接口和核心方法
lock() 用来获取锁。如果锁已被其他线程获取,则进行等待。
unlock() 释放锁
Lock接口和synchronized的比较
synchronized:是Java语言内置关键字,不需要手动释放锁。代码简洁,
Lock:是实现的一个类,需要手动释放锁。并且获取锁可以被中断,拥有超时获取锁,尝试获取锁等机制。
代码示例:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Auther: BlackKingW
* @Date: 2019/4/14 12:09
* @Description:
*/
public class LockDemo {
private Lock lock = new ReentrantLock();
private int count;
public void increament() {
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
public synchronized void incr2() {
count++;
incr2();
}
}
如increament采用lock,代码相对复杂,并且使用lock一定要在finally 中释放锁,否则可能会永远都释放不了,导致死锁。
2、可重入锁ReentrantLock
可重入意思为:已经获得该锁的线程,可以再次进入被锁定的代码块。内部通过计数器实现。例如上面的代码
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Auther: BlackKingW
* @Date: 2019/4/15 22:09
* @Description:
*/
public class ReentrantLockDemo {
private Lock lock = new ReentrantLock();
private int count;
public void increament() {
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
public synchronized void incr2() {
count++;
incr2();
}
public synchronized void test3() {
incr2();
}
}
在增加一个方法test3,去调用incr2,如果该锁不可以被重入,则无法调用incr2。导致程序一直运行不下去。可重入锁就是支持已经获取锁的线程,可以重复进入加锁的代码块。
3、公平锁和非公平锁。
公平锁和非公平锁。何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO
当多个线程去请求加锁代码块时,同时只能有一个线程拥有锁,那么其他线程如果是按照到来的先后顺序,那么这个锁就是公平锁。比如ReentrantLock可指定是否为公平和非公平锁。否则就是非公平锁。比如synchronized。
公平锁 VS 非公平锁
公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。
公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量。
4、ReadWriteLock接口和读写锁ReentrantReadWriteLock
那是不是所有的锁都只能被一个线程所拥有呢?当然不是。例如ReentrantReadWriteLock读写锁。
ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。
ReadWriteLock接口有两个方法
Lock readLock(); 获取读锁
Lock writeLock(); 获取写锁
ReentrantReadWriteLock实现了ReadWriteLock接口。用于获取读写锁。
ReentrantLock和synchronized关键字,同时只能有一个线程持有,所以都是排他锁,而ReentrantReadWriteLock可以同时有多个线程去访问,这种所也叫共享锁。
使用场景: 读多写少的情况
代码示例
/**
* @Auther: BlackKingW
* @Date: 2019/4/15 22:09
* @Description:
*/
public class UseSyn implements GoodsService {
private GoodsInfo goodsInfo;
public UseSyn(GoodsInfo goodsInfo) {
this.goodsInfo = goodsInfo;
}
@Override
public synchronized GoodsInfo getNum() {
SleepTools.ms(5);
return this.goodsInfo;
}
@Override
public synchronized void setNum(int number) {
SleepTools.ms(5);
goodsInfo.changeNumber(number);
}
}
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @Auther: BlackKingW
* @Date: 2019/4/15 22:09
* @Description:
*/
public class UseRwLock implements GoodsService {
private GoodsInfo goodsInfo;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//读锁
private final Lock setLock = lock.writeLock();//写锁
public UseRwLock(GoodsInfo goodsInfo) {
this.goodsInfo = goodsInfo;
}
@Override
public GoodsInfo getNum() {
getLock.lock();
try {
SleepTools.ms(5);
return this.goodsInfo;
}finally {
getLock.unlock();
}
}
@Override
public void setNum(int number) {
setLock.lock();
try {
SleepTools.ms(5);
goodsInfo.changeNumber(number);
}finally {
setLock.unlock();
}
}
}
/**
* @Auther: BlackKingW
* @Date: 2019/4/14 12:09
* @Description:
*/
public interface GoodsService {
public GoodsInfo getNum();//获得商品的信息
public void setNum(int number);//设置商品的数量
}
/**
* @Auther: BlackKingW
* @Date: 2019/4/15 22:09
* @Description:
*/
public class GoodsInfo {
private final String name;
private double totalMoney;//总销售额
private int storeNumber;//库存数
public GoodsInfo(String name, int totalMoney, int storeNumber) {
this.name = name;
this.totalMoney = totalMoney;
this.storeNumber = storeNumber;
}
public double getTotalMoney() {
return totalMoney;
}
public int getStoreNumber() {
return storeNumber;
}
public void changeNumber(int sellNumber){
this.totalMoney += sellNumber*25;
this.storeNumber -= sellNumber;
}
}
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* @Auther: BlackKingW
* @Date:2019/4/15 22:09
* @Description:
*/
public class BusiApp {
static final int readWriteRatio = 10;//读写线程的比例
static final int minthreadCount = 3;//最少线程数
//static CountDownLatch latch= new CountDownLatch(1);
//读操作
private static class GetThread implements Runnable{
private GoodsService goodsService;
public GetThread(GoodsService goodsService) {
this.goodsService = goodsService;
}
@Override
public void run() {
// try {
// latch.await();//让读写线程同时运行
// } catch (InterruptedException e) {
// }
long start = System.currentTimeMillis();
for(int i=0;i<100;i++){//操作100次
goodsService.getNum();
}
System.out.println(Thread.currentThread().getName()+"读取商品数据耗时:"
+(System.currentTimeMillis()-start)+"ms");
}
}
//写操做
private static class SetThread implements Runnable{
private GoodsService goodsService;
public SetThread(GoodsService goodsService) {
this.goodsService = goodsService;
}
@Override
public void run() {
// try {
// latch.await();//让读写线程同时运行
// } catch (InterruptedException e) {
// }
long start = System.currentTimeMillis();
Random r = new Random();
for(int i=0;i<10;i++){//操作10次
SleepTools.ms(50);
goodsService.setNum(r.nextInt(10));
}
System.out.println(Thread.currentThread().getName()
+"写商品数据耗时:"+(System.currentTimeMillis()-start)+"ms---------");
}
}
public static void main(String[] args) throws InterruptedException {
GoodsInfo goodsInfo = new GoodsInfo("Cup",100000,10000);
GoodsService goodsService = new UseRwLock(goodsInfo);/*new UseSyn(goodsInfo);*/
for(int i = 0;i<minthreadCount;i++){
Thread setT = new Thread(new SetThread(goodsService));
for(int j=0;j<readWriteRatio;j++) {
Thread getT = new Thread(new GetThread(goodsService));
getT.start();
}
SleepTools.ms(100);
setT.start();
}
//latch.countDown();
}
}
通过修改busiApp,使用读写锁,
GoodsService goodsService = new UseRwLock(goodsInfo);
执行完毕时间为
将busiApp修改为,使用synchronized关键字
GoodsService goodsService = new UseSyn(goodsInfo);
执行完毕时间为
ReentrantReadWriteLock和ReentrantLock支持以下功能:
1)支持公平和非公平的获取锁的方式;
2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;
3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;
4)读取锁和写入锁都支持锁获取期间的中断;
5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。
5、Condition接口
在我的并发编程专题三-线程的并发工具类这篇文章里,讲了wait和notify/notifyAll。而Condition接口的功能和wait和notify功能和类似。
Condition主要方法为
await() 当前线程进入等待状态
signal() 唤醒一个等待在Condition上的线程
signalAll() 唤醒所有等待在Condition上的线程
await、signal、signalAll和wait、notify、notifyAll的等待通知机制的区别
await、signal、signalAll:建立在lock之上的,使用之前需要绑定lock锁。准确的通知需要唤醒的对象。唤醒时建议使用signal()方法
wait、notify、notifyAll:建立在Object之上的,使用之前需要获取对象锁,不能准确地通知需要唤醒的对象,唤醒时建议使用notifyAll()。
代码举例
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Auther: BlackKingW
* @Date: 2019/4/14 12:09
* @Description:
*/
public class ExpressCond {
public final static String CITY = "ShangHai";
private int km;/*快递运输里程数*/
private String site;/*快递到达地点*/
private Lock lock = new ReentrantLock();
private Condition keCond = lock.newCondition();
private Condition siteCond = lock.newCondition();
public ExpressCond() {
}
public ExpressCond(int km, String site) {
this.km = km;
this.site = site;
}
/* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
public void changeKm(){
lock.lock();
try {
this.km = 101;
keCond.signal();
}finally {
lock.unlock();
}
}
/* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
public void changeSite(){
lock.lock();
try {
this.site = "BeiJing";
siteCond.signal();
}finally {
lock.unlock();
}
}
/*当快递的里程数大于100时更新数据库*/
public void waitKm(){
lock.lock();
try {
while(this.km<=100) {
try {
keCond.await();
System.out.println("check km thread["+Thread.currentThread().getId()
+"] is be notifed.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
System.out.println("the Km is "+this.km+",I will change db");
}
/*当快递到达目的地时通知用户*/
public void waitSite(){
lock.lock();
try {
while(CITY.equals(this.site)) {
try {
siteCond.await();
System.out.println("check site thread["+Thread.currentThread().getId()
+"] is be notifed.");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}finally {
lock.unlock();
}
System.out.println("the site is "+this.site+",I will call user");
}
}
/**
* @Auther: BlackKingW
* @Date: 2019/4/14 12:09
* @Description:
*/
public class TestCond {
private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);
/*检查里程数变化的线程,不满足条件,线程一直等待*/
private static class CheckKm extends Thread{
@Override
public void run() {
express.waitKm();
}
}
/*检查地点变化的线程,不满足条件,线程一直等待*/
private static class CheckSite extends Thread{
@Override
public void run() {
express.waitSite();
}
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<3;i++){
new CheckSite().start();
}
for(int i=0;i<3;i++){
new CheckKm().start();
}
Thread.sleep(1000);
express.changeKm();//快递里程变化
}
}
将上篇文章的例子,进行修改,使用Condition进行通知。可以发现,当里程数发生变化时,会准确的通知到里程数变化,进行相应的业务处理。而不像执行notify的时候,可能会唤醒等待地点变化的业务。从而导致业务员异常。
本章主要了解几种显示锁。以及重入锁,排它锁,共享锁等锁的概念。本文的代码里leepTools.ms(5);都可使用Thread.Sleep代替。欢迎大家多多指点。