PS:好累啊,好晚才到家,今天把学的并发编程的最后一点工具和概念总结下,明天正式进入aqs的源码学习~

一、原子操作CAS

1、什么是原子操作atomic operation?

所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (线程切换)。

2、java是如何实现原子操作?

1、使用synchronized对操作加锁

存在问题:

1、被阻塞的线程优先级很高

2、拿到锁的线程一直不释放锁怎么办?

3、大量的竞争,消耗cpu,同时带来死锁或者其他安全。

2、循环CAS(compare and swap)实现原子操作

Java中的CAS操作正是利用了处理器提供的CMPXCHG指令实现的。自旋CAS实现的基本思路就是循环进行CAS操作直到操作成功为止。

2.1、CAS的原理

CAS(Compare And Swap),指令级别保证这是一个原子操作

三个运算符:  一个内存地址V,一个期望的值A,一个新值B

基本思路:如果地址V上的值和期望的值A相等,就给地址V赋给新值B,如果不是,不做任何操作。循环(死循环,自旋)里不断的进行CAS操作

2.2、CAS的问题

1、ABA问题

就是一个县城可能将A改成了B,然后又有个线程将B又改成了A。但此时的A已经不是我们原本的A了。

就比如喝水,我到了一杯水,然后去上了个厕所,然后同事把我水喝了然后又给我接满了,等我回来时虽然桌子上还是一杯水,但此时已经不是我的那杯了。所以为了解决这个问题,我们可以对我们使用的地址通过加个版本号的概念,来标识我们的变量是否发生变化。

可使用AtomicStampedReference和AtomicMarkableReference记录版本号

2、开销问题

自旋还是很消耗性能的

3、只能保证一个共享变量的原子操作

2.3、Jdk中相关原子操作类的使用

更新基本类型类:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
更新数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新引用类型:AtomicReference,AtomicMarkableReference,AtomicStampedReference
原子更新字段类: AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater (较少使用)
代码示例:

import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/15 22:09
 * @Description:带版本号的原子操作
 */
public class UseAtomicStampedReference {

	static AtomicStampedReference<String> asr =
			new AtomicStampedReference<>("BlackKingW",0);


    public static void main(String[] args) throws InterruptedException {
    	final int oldStamp = asr.getStamp();//那初始的版本号
    	final String oldReferenc = asr.getReference();

    	System.out.println(oldReferenc+"==========="+oldStamp);

    	Thread rightStampThread = new Thread(new Runnable() {

			@Override
			public void run() {
				System.out.println(Thread.currentThread().getName()
						+"当前变量值:"+oldReferenc+"当前版本戳:"+oldStamp+"-"
						+asr.compareAndSet(oldReferenc, oldReferenc+"Java",
								oldStamp, oldStamp+1));

			}

    	});

    	Thread errorStampThread = new Thread(new Runnable() {

			@Override
			public void run() {
				String reference = asr.getReference();
				System.out.println(Thread.currentThread().getName()
						+"当前变量值:"+reference+"当前版本戳:"+asr.getStamp()+"-"
						+asr.compareAndSet(reference, reference+"C",
								oldStamp, oldStamp+1));

			}

    	});

    	rightStampThread.start();
    	rightStampThread.join();
    	errorStampThread.start();
    	errorStampThread.join();
    	System.out.println(asr.getReference()+"==========="+asr.getStamp());

    }
}

二、显式锁

1、Lock接口和核心方法

lock()   用来获取锁。如果锁已被其他线程获取,则进行等待。

unlock() 释放锁

tryLock() 它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取

Lock接口和synchronized的比较

synchronized:是Java语言内置关键字,不需要手动释放锁。代码简洁,

Lock:是实现的一个类,需要手动释放锁。并且获取锁可以被中断,拥有超时获取锁,尝试获取锁等机制。

代码示例:



import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class LockDemo {

	private Lock lock  = new ReentrantLock();
	private int count;

	public void increament() {
		lock.lock();
		try {
			count++;
		}finally {
			lock.unlock();
		}
	}

	public synchronized void incr2() {
		count++;
		incr2();
	}


}

如increament采用lock,代码相对复杂,并且使用lock一定要在finally 中释放锁,否则可能会永远都释放不了,导致死锁。

2、可重入锁ReentrantLock

可重入意思为:已经获得该锁的线程,可以再次进入被锁定的代码块。内部通过计数器实现。例如上面的代码



import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/15 22:09
 * @Description:
 */
public class ReentrantLockDemo {

	private Lock lock  = new ReentrantLock();
	private int count;

	public void increament() {
		lock.lock();
		try {
			count++;
		}finally {
			lock.unlock();
		}
	}

	public synchronized void incr2() {
		count++;
		incr2();
	}

	public synchronized void test3() {
		incr2();
	}

}

在增加一个方法test3,去调用incr2,如果该锁不可以被重入,则无法调用incr2。导致程序一直运行不下去。可重入锁就是支持已经获取锁的线程,可以重复进入加锁的代码块。

3、公平锁和非公平锁。

公平锁非公平锁何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO

当多个线程去请求加锁代码块时,同时只能有一个线程拥有锁,那么其他线程如果是按照到来的先后顺序,那么这个锁就是公平锁。比如ReentrantLock可指定是否为公平和非公平锁。否则就是非公平锁。比如synchronized。

公平锁 VS 非公平锁

公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。

公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量。

4、ReadWriteLock接口和读写锁ReentrantReadWriteLock

那是不是所有的锁都只能被一个线程所拥有呢?当然不是。例如ReentrantReadWriteLock读写锁。

ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。

ReadWriteLock接口有两个方法

 Lock readLock();  获取读锁

 Lock writeLock();  获取写锁

ReentrantReadWriteLock实现了ReadWriteLock接口。用于获取读写锁。

ReentrantLock和synchronized关键字,同时只能有一个线程持有,所以都是排他锁,而ReentrantReadWriteLock可以同时有多个线程去访问,这种所也叫共享锁

使用场景: 读多写少的情况

代码示例

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/15 22:09
 * @Description:
 */
public class UseSyn implements GoodsService {

	private GoodsInfo goodsInfo;

	public UseSyn(GoodsInfo goodsInfo) {
		this.goodsInfo = goodsInfo;
	}

	@Override
	public synchronized GoodsInfo getNum() {
		SleepTools.ms(5);
		return this.goodsInfo;
	}

	@Override
	public synchronized void setNum(int number) {
		SleepTools.ms(5);
		goodsInfo.changeNumber(number);

	}

}


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/15 22:09
 * @Description:
 */
public class UseRwLock implements GoodsService {

    private GoodsInfo goodsInfo;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock getLock = lock.readLock();//读锁
    private final Lock setLock = lock.writeLock();//写锁

    public UseRwLock(GoodsInfo goodsInfo) {
        this.goodsInfo = goodsInfo;
    }

	@Override
	public GoodsInfo getNum() {
		getLock.lock();
		try {
			SleepTools.ms(5);
			return this.goodsInfo;
		}finally {
			getLock.unlock();
		}

	}

	@Override
	public void setNum(int number) {
		setLock.lock();
		try {
			SleepTools.ms(5);
			goodsInfo.changeNumber(number);
		}finally {
			setLock.unlock();
		}
	}
}

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public interface GoodsService {

	public GoodsInfo getNum();//获得商品的信息
	public void setNum(int number);//设置商品的数量
}



/**
 * @Auther: BlackKingW
 * @Date: 2019/4/15 22:09
 * @Description:
 */
public class GoodsInfo {
    private final String name;
    private double totalMoney;//总销售额
    private int storeNumber;//库存数

    public GoodsInfo(String name, int totalMoney, int storeNumber) {
        this.name = name;
        this.totalMoney = totalMoney;
        this.storeNumber = storeNumber;
    }

    public double getTotalMoney() {
        return totalMoney;
    }

    public int getStoreNumber() {
        return storeNumber;
    }

    public void changeNumber(int sellNumber){
        this.totalMoney += sellNumber*25;
        this.storeNumber -= sellNumber;
    }
}


import java.util.Random;
import java.util.concurrent.CountDownLatch;


/**
 * @Auther: BlackKingW
 * @Date:2019/4/15 22:09
 * @Description:
 */
public class BusiApp {
    static final int readWriteRatio = 10;//读写线程的比例
    static final int minthreadCount = 3;//最少线程数
    //static CountDownLatch latch= new CountDownLatch(1);

    //读操作
    private static class GetThread implements Runnable{

        private GoodsService goodsService;
        public GetThread(GoodsService goodsService) {
            this.goodsService = goodsService;
        }

        @Override
        public void run() {
//            try {
//                latch.await();//让读写线程同时运行
//            } catch (InterruptedException e) {
//            }
            long start = System.currentTimeMillis();
            for(int i=0;i<100;i++){//操作100次
                goodsService.getNum();
            }
            System.out.println(Thread.currentThread().getName()+"读取商品数据耗时:"
             +(System.currentTimeMillis()-start)+"ms");

        }
    }

    //写操做
    private static class SetThread implements Runnable{

        private GoodsService goodsService;
        public SetThread(GoodsService goodsService) {
            this.goodsService = goodsService;
        }

        @Override
        public void run() {
//            try {
//                latch.await();//让读写线程同时运行
//            } catch (InterruptedException e) {
//            }
            long start = System.currentTimeMillis();
            Random r = new Random();
            for(int i=0;i<10;i++){//操作10次
            	SleepTools.ms(50);
                goodsService.setNum(r.nextInt(10));
            }
            System.out.println(Thread.currentThread().getName()
            		+"写商品数据耗时:"+(System.currentTimeMillis()-start)+"ms---------");

        }
    }

    public static void main(String[] args) throws InterruptedException {
        GoodsInfo goodsInfo = new GoodsInfo("Cup",100000,10000);
        GoodsService goodsService = new UseRwLock(goodsInfo);/*new UseSyn(goodsInfo);*/
        for(int i = 0;i<minthreadCount;i++){
            Thread setT = new Thread(new SetThread(goodsService));
            for(int j=0;j<readWriteRatio;j++) {
                Thread getT = new Thread(new GetThread(goodsService));
                getT.start();
            }
            SleepTools.ms(100);
            setT.start();
        }
        //latch.countDown();

    }
}

通过修改busiApp,使用读写锁,

GoodsService goodsService = new UseRwLock(goodsInfo);

执行完毕时间为

并发编程专题四-原子操作和显示锁-LMLPHP

将busiApp修改为,使用synchronized关键字

GoodsService goodsService = new UseSyn(goodsInfo);

执行完毕时间为

并发编程专题四-原子操作和显示锁-LMLPHP

ReentrantReadWriteLock和ReentrantLock支持以下功能:

    1)支持公平和非公平的获取锁的方式;

    2)支持可重入。读线程在获取了读锁后还可以获取读锁;写线程在获取了写锁之后既可以再次获取写锁又可以获取读锁;

    3)还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不允许的;

    4)读取锁和写入锁都支持锁获取期间的中断;

    5)Condition支持。仅写入锁提供了一个 Conditon 实现;读取锁不支持 Conditon ,readLock().newCondition() 会抛出 UnsupportedOperationException。 

5、Condition接口

在我的并发编程专题三-线程的并发工具类这篇文章里,讲了wait和notify/notifyAll。而Condition接口的功能和wait和notify功能和类似。

Condition主要方法为

await() 当前线程进入等待状态

signal() 唤醒一个等待在Condition上的线程

signalAll() 唤醒所有等待在Condition上的线程

await、signal、signalAll和wait、notify、notifyAll的等待通知机制的区别

await、signal、signalAll:建立在lock之上的,使用之前需要绑定lock锁。准确的通知需要唤醒的对象。唤醒时建议使用signal()方法

wait、notify、notifyAll:建立在Object之上的,使用之前需要获取对象锁,不能准确地通知需要唤醒的对象,唤醒时建议使用notifyAll()。

代码举例

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class ExpressCond {
    public final static String CITY = "ShangHai";
    private int km;/*快递运输里程数*/
    private String site;/*快递到达地点*/
    private Lock lock = new ReentrantLock();
    private Condition keCond = lock.newCondition();
    private Condition siteCond = lock.newCondition();

    public ExpressCond() {
    }

    public ExpressCond(int km, String site) {
        this.km = km;
        this.site = site;
    }

    /* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
    public void changeKm(){
        lock.lock();
        try {
        	this.km = 101;
        	keCond.signal();
        }finally {
        	lock.unlock();
        }
    }

    /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
    public  void changeSite(){
    	lock.lock();
        try {
        	this.site = "BeiJing";
        	siteCond.signal();
        }finally {
        	lock.unlock();
        }
    }

    /*当快递的里程数大于100时更新数据库*/
    public void waitKm(){
    	lock.lock();
    	try {
        	while(this.km<=100) {
        		try {
        			keCond.await();
    				System.out.println("check km thread["+Thread.currentThread().getId()
    						+"] is be notifed.");
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
        	}
    	}finally {
    		lock.unlock();
    	}

        System.out.println("the Km is "+this.km+",I will change db");
    }

    /*当快递到达目的地时通知用户*/
    public void waitSite(){
    	lock.lock();
        try {
        	while(CITY.equals(this.site)) {
        		try {
        			siteCond.await();
    				System.out.println("check site thread["+Thread.currentThread().getId()
    						+"] is be notifed.");
    			} catch (InterruptedException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
        	}
        }finally {
        	lock.unlock();
        }
        System.out.println("the site is "+this.site+",I will call user");
    }
}


/**
 * @Auther: BlackKingW
 * @Date: 2019/4/14 12:09
 * @Description:
 */
public class TestCond {
    private static ExpressCond express = new ExpressCond(0,ExpressCond.CITY);

    /*检查里程数变化的线程,不满足条件,线程一直等待*/
    private static class CheckKm extends Thread{
        @Override
        public void run() {
        	express.waitKm();
        }
    }

    /*检查地点变化的线程,不满足条件,线程一直等待*/
    private static class CheckSite extends Thread{
        @Override
        public void run() {
        	express.waitSite();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++){
            new CheckSite().start();
        }
        for(int i=0;i<3;i++){
            new CheckKm().start();
        }

        Thread.sleep(1000);
        express.changeKm();//快递里程变化
    }
}

将上篇文章的例子,进行修改,使用Condition进行通知。可以发现,当里程数发生变化时,会准确的通知到里程数变化,进行相应的业务处理。而不像执行notify的时候,可能会唤醒等待地点变化的业务。从而导致业务员异常。

 

本章主要了解几种显示锁。以及重入锁,排它锁,共享锁等锁的概念。本文的代码里leepTools.ms(5);都可使用Thread.Sleep代替。欢迎大家多多指点。

并发编程专题一-线程相关基础概念

并发编程专题二-线程间的共享和协作

并发编程专题三-线程的并发工具类

04-16 10:08