一、 J.U.C之AQS-介绍

1、定义:

2、AQS底层的数据结构:

3、AQS的特点:

4、AQS具体实现的大致思路:

5、AQS 同步组件

二 、J.U.C之AQS-CountDownLatch

1、 CountDownLatch执行原理图:

说明:

2、CountDownLatch线程安全测试代码实例:

@Slf4j
public class CountDownLatchExample1 {

    private final static  int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0 ; i < threadCount ; i++ ){
            final int threadNum = i;
            exec.execute(()->{
                try {
                    test(threadNum);
                }catch (Exception e){
                    log.error("exception",e);
                }finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
exec.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        Thread.sleep(100);
        log.info("{}",threadNum);
        Thread.sleep(100);
    }
}

执行打印结果:

22:47:37.052 [pool-1-thread-101] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 100
.......
22:47:37.060 [pool-1-thread-192] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 191
22:47:37.192 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - finish

Process finished with exit code 0

由打印结果可知,finish是在所有线程执行完后才执行的。

分析:这是因为CountDownLatch.countDown()计数器已经降到0了,所以在最后的CountDownLatch.await()校验通过,接着就打印最后面的finish字段

3、实现CountDownLatch在指定时间内完成:

//让CountDownLatch等待10毫秒
countDownLatch.await(10, TimeUnit.MILLISECONDS);

作用 :可以给定时间让线程有时间反应过来执行。

三、J.U.C之AQS-Semaphore

1、定义

2、Semaphore的使用场景:

3、Semaphore线程安全代码演示:

@Slf4j
public class SemaphoreExample1 {

    private final static  int threadCount = 20;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0 ; i < threadCount ; i++ ){
            final int threadNum = i;
            exec.execute(()->{
                try {
                    semaphore.acquire();// 获取一个许可
                    test(threadNum);
                    semaphore.release();// 释放一个许可
                }catch (Exception e){
                    log.error("exception",e);
                }
            });
        }
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws InterruptedException {
        log.info("{}",threadNum);
        Thread.sleep(1000);
    }

}

执行打印线程结果:

由图中结果可以知道,代码中Semaphore限定了当前同时执行的线程数为3,而在统一时间只有3个线程被打印,一秒后又有3个线程被打印,由此可以说明Semaphore是可以实现线程安全操作的。

4、测试多个许可条件下的Semaphore业务场景:

semaphore.acquire(3);// 获取多个许可
test(threadNum);
semaphore.release(3);// 释放多个许可

执行打印线程结果:

23:12:57.078 [main] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - finish
23:12:57.078 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 0
23:12:58.084 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 1
23:12:59.084 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 2
23:13:00.085 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 3
23:13:01.085 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 4

由执行结果可知,每一秒Semaphore只执行一次线程操作,这是因为同一时间内获取3个许可,又同一时间内释放3个许可,就相当于是在同一时间内只执行一次test(threadNum)线程操作。而同一秒钟内拿到了3个许可,而与原本Semaphore定义的3个可执行线程数一校验,发现在同一秒钟内没有多余的许可可以释放了。这就很接近单线程的执行了。

5、案例:

1) 场景需求:当前的并发数是3个,超过部分则丢弃:

if(semaphore.tryAcquire()){ //尝试获取一个许可
    test(threadNum);
    semaphore.release();//释放一个许可
}

当尝试获取一个许可,如果获取不到,就丢弃;获取到,就执行。又由于线程业务处理消耗了一秒,如:

private static void test(int threadNum) throws InterruptedException {
    log.info("{}",threadNum);
    Thread.sleep(1000);
}

所以,最终可能执行的线程数只有Semaphore最初定义时所并发执行的3个线程。

2) 场景需求:超时时间内,执行许可:

if(semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)){ //尝试获取一个许可
    test(threadNum);
    semaphore.release();//释放一个许可
}

当尝试在5秒内获取一个许可,如果超时后依旧获取不到,就丢弃;获取到,就执行。

最终可能执行的超过约定的初始3个线程,但不一定全部线程数都能够被执行。


四、J.U.C之AQS-CyclicBarrier

1、图解 CyclicBarrier

CyclicBarrier是一个同步辅助类,它允许一组线程同步等待,直到某一个公共的屏障点,通过它可以实现线程之间的相互等待,每个线程都各自就绪后,才能够继续执行后续的操作。

它和CountDownLatch一样都是通过计数器来实现的。CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

比如:

我们用excel保存用户的银行流水,excel的每一页保存用户每一年的银行流水,现在我们要统计用户的日均流水,我们就可以用多线程处理每个页里面的银行流水,都执行完以后得到每一个页里面的日均银行流水,之后CyclicBarrier.action利用多线程计算结果再计算用户的日均银行流水。

2、CyclicBarrier和CountDownLatch的区别:

第一点:CountDownLatch的计数器只能使用一次,CyclicBarrier的计数器可以使用resume方法重置,重复使用;

第二点:CountDownLatch主要实现一个或N个线程需要等待其他线程完成某项操作之后才能去往下执行,它描述的是一个或N个线程等待其他线程的关系;

而CyclicBarrier是实现多个线程之间相互等待,在所有线程都满足条件之后才能去执行后续的操作,它描述的是各个线程内部相互等待的关系。

3、CyclicBarrier线程安全代码演示:

@Slf4j
@ThreadSafe
public class CyclicBarrierExample1 {

    //告知当前有多少个线程同时在等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0 ; i < 10 ; i++){
            final  int threadNum = i;
            Thread.sleep(1000);
            executor.execute(()->{
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception",e);
                }
            });
        }
    }
    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready",threadNum);
        //当每一个线程准备就绪后,调用await方法告知CyclicBarrier当前线程OK了,当准备就绪的线程数达到了CyclicBarrier声明的5个线程的数量后,后面的线程就可以开始执行了
        barrier.await();
        log.info("{} continue",threadNum);
    }
}

4、CyclicBarrier支持传入等待时间,代码演示如下:

private static void race(int threadNum) throws Exception {
    Thread.sleep(1000);
    log.info("{} is ready",threadNum);
    try {
        barrier.await(2000, TimeUnit.MILLISECONDS);
    }catch (Exception e){
        log.warn("BrokenBarrierException",e);
    }
    log.info("{} continue",threadNum);
}

5、CyclicBarrier支持线程到达屏障时,优先支持running:

//告知当前有多少个线程同时在等待
private static CyclicBarrier barrier = new CyclicBarrier(5,()->{
    log.info("callback is running ");
});

执行结果:

00:04:24.350 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 0 is ready
00:04:25.348 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 1 is ready
00:04:26.349 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 2 is ready
00:04:27.350 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 3 is ready
00:04:28.350 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 is ready
00:04:28.350 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - callback is running
00:04:28.350 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 continue
00:04:28.351 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 

线程中的屏障是 4 is ready ,此时所有入队线程已准备就绪,即到达了屏障,在进入执行序列时,会优先执行 running ,即匿名指定的线程对象。

五、J.U.C之AQS-ReentrantLock与锁-1

1、 ReentrantLock (可重入锁) 和 synchronized 区别

 可重入性:ReentrantLock的字面意思就是可重入的,而synchronized也是属于可重入的,两者的却别不大;都是进入一次线程,锁的计数器就自增1,所以等到锁的计数器下降为0时才会释放锁。

锁的实现:synchronized是依赖于JVM实现的,而ReentrantLock基于JDK实现的;两者的区别就类似于用户控制操作系统来实现和自己敲代码实现的区别;

性能的区别:在synchronized的关键字优化以前,synchronized的性能比ReentrantLock的性能差很多;从synchronized引入了偏向锁、轻量级锁也就是自旋锁后,它们两者的性能就差不多了,在两者都可用的情况下,官方更建议使用synchronized的,因为它的写法更容易。

功能区别

2、ReentrantLock独有的功能

所谓公平锁,就是先等待的线程先获得锁,这一点ReentrantLock是独有的,可以自己选择公平还是不公平

3、ReentranLock的定义:

4、什么情况下适合使用ReentrantLock呢?

5、ReentrantLock 和 Synchronized

Synchronized 能做的事情,ReentrantLock都能做,而ReentrantLock能做的Synchronized却有许多都做不了,在性能方面ReentrantLock不比Synchronized差!

那么,是否可以抛弃Synchronized呢?

6、ReentrantLock相关类的代码演示;

@Slf4j
public class LockExample2 {

    //请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0 ; i < clientTotal ; i++){
            int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count );
    }

    private static void  add(){
        lock.lock();
        try {
            count ++;
        }finally {
            lock.unlock();
        }
    }
}

执行并打印结果:

13:47:27.962 [main] INFO com.mmall.concurrency.example.lock.LockExample2 - count:5000

Process finished with exit code 0

7、重要底层实现原理分析:

ReentrantLock的底层代码实现中:

默认构造方法中生成给定的是一个不公平的锁;

在带参构造中,根据传入的true或者false来决定是使用公平锁或不公平锁

仅在调用时锁定未被保持的情况下才获取锁定

如果锁定在给定的等待时间内没有被线程保持,且当前线程没有被中断,则获取这个锁定

如果当前线程没有被中断,就获取锁定;如果被中断,就抛出异常。

查询此锁定,是否有任意线程保持

查询当前线程是否处于锁定状态

isFair()作用是判断是不是公平锁

8、扩展:ReentrantReadWriteLock

作用:在没有任何读写锁的时候才可以取得写入锁

六、 J.U.C之AQS-ReentrantLock与锁-2

1、ReentrantReadWriteLock实例代码演示:

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key){
        readLock.lock();
        try {
            return map.get(key);
        }finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys(){
        readLock.lock();
        try {
            return map.keySet();
        }finally {
            readLock.unlock();
        }
    }

    public Data put(String key,Data value){
        writeLock.lock();
        try {
            return map.put(key,value);
        }finally {
            writeLock.unlock();
        }
    }

    class Data{

    }

}

说明:一个类里面封装了一个内部的map,这个map不需要把所有的方法都暴露给别人,它们要做的事情完全都通过我提供的方法来使用。而我提供一些单独的方法来给外部使用,使用的时候为了避免发生并发的问题,可以加上ReentrantReadWriteLock来实现在读和写的时候分别加锁。只有在没有读写锁的时候才能进行相应的插入操作或读操作。

这里实现的是悲观读取。

在进行新的插入操作时,必须要当前执行的插入或读操作完成后才能继续执行。

如果发生写入很多,而读取很少的时候,使用ReentrantReadWriteLock类就可能会遭遇饥饿。所谓饥饿,就是代码中的

写锁一直想执行,但是大量的读操作就会导致写操作永远都无法执行,一直在等待,而不知道什么时候能真正的去执行这个写操作。

2、另一种锁:StampedLock

StampedLock控制锁有3中模式,分别是:写、读、乐观读(重点)

一个StampedLock的状态是有版本和模式两个部分组成,锁获取的方法是返回一个数字作为票据(stamped)。它用相应的锁状态来表示并控制相关锁的访问。数字0表示没有写锁被访问;读锁被分为悲观锁和乐观锁;所谓乐观读是写的操作很多,读的操作很少的情况下,我们可以认为写入和读取同时发生的几率很少,因此不悲观的使用完全读取锁定,程序可以采取查看

读取资料之后,是否遭到写入执行的变更,再采取后续的措施。这一个相应的改进可以大幅度提高程序的吞吐量。

3、StampedLock实例代码演示:

@Slf4j
public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }

}

4、StampedLock实例代码演示:


@Slf4j
public class LockExample5 {

    //请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    private static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0 ; i < clientTotal ; i++){
            int count = i;
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                }catch (Exception e){
                    log.error("exception",e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}",count );
    }

    private static void  add(){
       long stamped = lock.writeLock();
        try {
            count ++;
        }finally {
            lock.unlock(stamped);
        }
    }
}

5、代码演示Condition的作用:

public static void main(String[] args) throws InterruptedException {

    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();

    new Thread(()->{
        try {
            reentrantLock.lock();
            log.info("wait signal");// 1
            condition.await();
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        log.info("get signal");//  4
        reentrantLock.unlock();

    }).start();

    new Thread(()->{
        reentrantLock.lock();
        log.info("get lock");// 2
        try {
            Thread.sleep(3000);
        }catch (InterruptedException e){
            e.printStackTrace();
        }
        condition.signalAll();
        log.info("send signal"); //   3
        reentrantLock.unlock();
    }).start();

}

执行打印结果:

14:42:33.234 [Thread-0] INFO com.mmall.concurrency.example.lock.LockExample6 - wait signal
14:42:33.238 [Thread-1] INFO com.mmall.concurrency.example.lock.LockExample6 - get lock
14:42:36.238 [Thread-1] INFO com.mmall.concurrency.example.lock.LockExample6 - send signal
14:42:36.238 [Thread-0] INFO com.mmall.concurrency.example.lock.LockExample6 - get signal

Process finished with exit code 0

结果分析:

首先,我们定义了一个ReentrantLock的实例,从实例中取出了一个Condition,也就是reentrantLock.newCondition();操作;

在main方法中,我们声明了两个线程Thread方法,按顺序命名为T1和T2.

在T1中,使用reentrantLock.lock()方法将线程T1加入到了AQS的等待队列里面,接着线程T1输出了“wait signal”日志信息,紧接着T1中调用了condition.await()方法,这时就将T1从AQS的等待队列中移除了,该操作对应了锁的释放。

接着,它就加入到了condition的等待队列中去了,等待这该线程需要的序号。这里对应了AQS原理图中的Condidtion queue,如上图。

线程T2因为线程T1释放锁的关系被唤醒,并判断它是否可以取到锁,于是获取锁,也加入到了,AQS的Sync queue等待队列中,当执行取锁任务后,就继续执行后续的日志打印“get lock”操作,线程T2在执行打印完成后,接着执行condition.signalAll()方法,紧接着就输出了后续的日志打印操作“send signal ”发送信号操作,

而此时condition线程队列中有线程T1的一个节点,于是它就被取出来加入到了AQS的等待队列里面去。注意:此时线程T1并没有被唤醒,只是加入到了AQS服务里面的Sync queue队列里面。

当线程T2的signaAll()发送信号的方法执行完成以后,调用了reentrantLock.unlock()方法,就释放了锁。此时在AQS的Sync queue队列中只有线程T1的线程,而AQS按照线程从头到尾的顺序唤醒了T1线程。于是线程T1继续开始执行,继续执行的时候就得到了最后的线程输出,即“get signal”日志的打印。

通过以上代码执行的结果以及对执行过程的分析,可以看得出来,Condition也是多线程间协调通讯的工具类。使得某个或某个线程一直等待条件(unlock()),只有当该条件具备,这些等待的线程才会被唤醒。以上案例中,唤醒所具备的条件就是等待的信号,包含signal和signalAll这两个方法,分别用于唤醒一个或多个等待的线程。这些被唤醒的线程会重新按顺序获得锁,并执行相应的业务操作。

12-17 00:44
查看更多