并发是伴随着多核处理器的诞生而产生的,为了充分利用硬件资源,诞生了多线程技术。但是多线程又存在资源竞争的问题,引发了同步和互斥,并带来线程安全的问题。于是,从jdk1.5开始,引入了concurrent包来解决这些问题。

  java.util.concurrent 包是专为 Java并发编程而设计的包。

在Java中,当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替进行,在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么称这个类是线程安全的。

  一般来说,concurrent包基本上由有3个package组成 : 

   java.util.concurrent:提供大部分关于并发的接口和类,如BlockingQueue,Callable,ConcurrentHashMap,ExecutorService, Semaphore等 ;
  java.util.concurrent.atomic:提供所有原子操作的类, 如AtomicInteger, AtomicLong等;
  java.util.concurrent.locks:提供锁相关的类, 如Lock, ReentrantLock, ReadWriteLock, Condition等。

  concurrent包下的所有类可以分为如下几大类:

    locks部分:显式锁(互斥锁和速写锁)相关,如ReentrantLock,ReentrantReadWriteLock等;
atomic部分:原子变量类相关,是构建非阻塞算法的基础,如AtomicInteger,AtomicBoolean,AtomicLong,AtomicReference等;
executor部分:线程池相关,如ExecutorService,Callable,Future等;
collections部分:并发容器相关,如BlockingQueue,Deque,ConcurrentMap等;
tools部分:同步工具相关,如CountDownLatch,CyclicBarrier,Semaphore,Executors,Exchanger等。

  JUC的类图结构如下所示:

JDK1.5引入的concurrent包-LMLPHP

  concurrent包的优点有:
  ①功能丰富,诸如线程池(ThreadPoolExecutor),CountDownLatch等并发编程中需要的类已经有现成的实现,不需要自己去实现一套; 相比较而言,jdk1.4对多线程编程的主要支持几乎只有Thread, Runnable,synchronized等。synchronized和JDK5之后的Lock均是悲观锁(悲观锁一般是一个人在使用的时候,另一个人不能用,所以性能极低,所能支持的并发量就不高)。
  ②concurrent包里面的一些操作是基于硬件级别的CAS(compare and swap,比较再赋值),就是在cpu级别提供了原子操作,简单的说就是可以提供无阻塞、无锁定的算法; 而现代cpu大部分都是支持这种算法的。JUC(java.util.concurrent)是基于乐观锁的,既能保证数据不混乱,又能保证性能。

  version-(版本管理)就是基于乐观锁机制-->拿着我们期望的结果,和现有结果进行比对,如果是相同的,就赋值,如果不是相同的,就重试。
CAS:有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
CAS算法内部是通过JNI--native方法来实现: 由java底层的C语言或者C++实现。

  一般情况下:同步容器是有使用价值的。有时候,我们的异步容器,比如ArrayList,在并发环境下,会有这些问题:①数据紊乱;②java.util.ConcurrentModificationException。这都是对于集合的读写状态不一致造成的问题。
   我们可以这样构建同步容器:

     Collections.synchronizedList(new ArrayList<>());
Collections.synchronizedMap()
Collections.synchronizedSet()

  上面的方法可以实现同步容器,但是使用了悲观锁,因而效率不高。
  使用JUC体系中提供的容器,如:ConcurrentHashMap,则有这样的优势:①不会出现同步问题,数据是正常的;②速度相对较快  ,就算没有hashmap快,也比hashtable快的多。其实,ConcurrentHashMap内部也是有同步的,在这方面面和hashtable没有区别。那么,ConcurrentHashMap快在哪里?主要是因为,其内部划分了很多segment区域,当不同的线程操作不同的segment的时候,其实还是一个异步操作;只有当不同线程操作同一个segment的时候,才会发生同步操作,所以速度很快。一个ConcurrentHashMap内部最多能有16个segment。

  我们接下来看一个非常有用的类CountDownLatch, 它是一个可以用来在一个进程中等待多个线程完成任务的类。在此给出一个应用场景:某个主线程接到一个任务,起了n个子线程去完成,但是主线程需要等待这n个子线程都完成任务以后才开始执行某个操作。详见代码:

package com.itszt.test1;
import java.util.concurrent.CountDownLatch;
/**
* 主线程会在启动的子线程完全结束后再继续执行
*/
public class Test {
public static void main(String[] args) {
Test test = new Test();
test.demoCountDown();
} public void demoCountDown() {
int count = 10;
final CountDownLatch l = new CountDownLatch(count);
for (int i = 0; i < count; ++i) {
final int index = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thread -" + index + "- has finished...");
l.countDown();
}
}).start();
}
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("now all threads have finished");
}
}

  执行结果如下所示:

thread -9- has finished...
thread -8- has finished...
thread -0- has finished...
thread -4- has finished...
thread -1- has finished...
thread -2- has finished...
thread -5- has finished...
thread -6- has finished...
thread -3- has finished...
thread -7- has finished...
now all threads have finished

  接下来,我们再看下Atomic相关的类, 比如AtomicLong, AtomicInteger等。简单来说,这些类都是线程安全的,支持无阻塞无锁定的。

package com.itszt.test1;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
/**
* 测试AtomicLong与long
*/
public class AtomicTest {
public static void main(String[] args) {
AtomicTest test = new AtomicTest();
test.testAtomic();
} public void testAtomic() {
final int loopcount = 10000;
int threadcount = 10;
final NonSafeSeq seq1 = new NonSafeSeq();
final SafeSeq seq2 = new SafeSeq();
final CountDownLatch l = new CountDownLatch(threadcount);
for (int i = 0; i < threadcount; ++i) {
final int index = i;
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < loopcount; ++j) {
seq1.inc();
seq2.inc();
}
System.out.println("finished : " + index);
l.countDown();
}
}).start();
}
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("both have finished....");
System.out.println("NonSafeSeq:" + seq1.get());
System.out.println("SafeSeq with atomic: " + seq2.get());
}
class NonSafeSeq {
private long count = 0;
public void inc() {
count++;
}
public long get() {
return count;
}
} class SafeSeq {
private AtomicLong count = new AtomicLong(0);
public void inc() {
count.incrementAndGet();
}
public long get() {
return count.longValue();
}
}
}

  上述代码执行如下:

finished : 0
finished : 3
finished : 2
finished : 6
finished : 9
finished : 5
finished : 8
finished : 1
finished : 4
finished : 7
both have finished....
NonSafeSeq:98454
SafeSeq with atomic: 100000

  其中,NonSafeSeq是作为对比的类,直接放一个private long count不是线程安全的,而SafeSeq里面放了一个AtomicLong,是线程安全的;可以直接调用incrementAndGet来增加。通过上述执行结果可以看到,10个线程,每个线程运行了10,000次,理论上应该有100,000次增加,使用了普通的long是非线程安全的,而使用了AtomicLong是线程安全的。需要注意的是,这个例子也说明,虽然long本身的单个设置是原子的,要么成功要么不成功,但是诸如count++这样的操作就不是线程安全的,因为这包括了读取和写入两步操作。

  在jdk 1.4时代,线程间的同步主要依赖于synchronized关键字,本质上该关键字是一个对象锁,可以加在不同的instance上或者class上。
  concurrent包提供了一个可以替代synchronized关键字的ReentrantLock,简单的说,你可以new一个ReentrantLock, 然后通过lock.lock和lock.unlock来获取锁和释放锁;需要注意的是,必须将unlock放在finally块里面。reentrantlock的好处有

  ①更好的性能;
  ②提供同一个lock对象上不同condition的信号通知;
  ③还提供lockInterruptibly这样支持响应中断的加锁过程,意思是说你试图去加锁,但是当前锁被其他线程hold住,然后你这个线程可以被中断。  

package com.itszt.test1;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
/**
* 测试ReentrantLock
*/
public class ReentrantLockTest {
public static void main(String[] args) {
ReentrantLockTest lockTest = new ReentrantLockTest();
lockTest.demoLock();
} public void demoLock() {
final int loopcount = 10000;
int threadcount = 10;
final SafeSeqWithLock seq = new SafeSeqWithLock();
final CountDownLatch l = new CountDownLatch(threadcount);
for (int i = 0; i < threadcount; ++i) {
final int index = i;
new Thread(new Runnable() {
@Override
public void run() {
for (int j = 0; j < loopcount; ++j) {
seq.inc();
}
System.out.println("finished : " + index);
l.countDown();
}
}).start();
}
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("both have finished....");
System.out.println("SafeSeqWithLock:" + seq.get());
} class SafeSeqWithLock {
private long count = 0;
private ReentrantLock lock = new ReentrantLock(); public void inc() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
} public long get() {
return count;
}
}
}

  上述代码执行如下:

finished : 5
finished : 3
finished : 1
finished : 8
finished : 0
finished : 6
finished : 4
finished : 2
finished : 7
finished : 9
both have finished....
SafeSeqWithLock:100000

  上述代码操作中,通过对inc操作加锁,保证了线程安全。

  concurrent包里面还提供了一个非常有用的锁,读写锁ReadWriteLock。  

A ReadWriteLock maintains a pair of associated locks, one for read-only operations and one for writing. 
The read lock may be held simultaneously by multiple reader threads, so long as there are no writers.
The write lock is exclusive.

  上述英文意思是说:读锁可以有很多个锁同时上锁,只要当前没有写锁; 写锁是排他的,上了写锁,其他线程既不能上读锁,也不能上写锁;同样,需要上写锁的前提是既没有读锁,也没有写锁。  

package com.itszt.test1;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 测试读写锁
*/
public class RWLockTest {
public static void main(String[] args) {
RWLockTest lockTest = new RWLockTest();
lockTest.testRWLock_getw_onr();
} public void testRWLock_getw_onr() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final Lock rlock = lock.readLock();
final Lock wlock = lock.writeLock();
final CountDownLatch l = new CountDownLatch(2);
// start r thread,开启读锁
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " now to get rlock--获取读锁");
rlock.lock();
try {
Thread.currentThread().sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " now to unlock rlock--释放读锁");
rlock.unlock();
l.countDown();
}
}).start();
// start w thread,开启写锁
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " now to get wlock--获取写锁");
wlock.lock();
System.out.println(Thread.currentThread().getName() + " now to unlock wlock--释放写锁");
wlock.unlock();
l.countDown();
}
}).start();
try {
l.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finished");
}
}

  上述代码执行如下:

Thread-0 now to get rlock--获取读锁
Thread-1 now to get wlock--获取写锁
Thread-0 now to unlock rlock--释放读锁
Thread-1 now to unlock wlock--释放写锁
main finished

  ReadWriteLock的实现是ReentrantReadWriteLock,有趣的是,在一个线程中,读锁不能直接升级为写锁,但是写锁可以降级为读锁;这意思是说,如果你已经有了读锁,再去试图获得写锁,将会无法获得, 一直堵住了;但是如果你有了写锁,再去试图获得读锁,就没问题。

  下面是一段写锁降级的代码:

public void testRWLock_downgrade() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Lock rlock = lock.readLock();
Lock wlock = lock.writeLock();
System.out.println("now to get wlock");
wlock.lock();
System.out.println("now to get rlock");
rlock.lock();
System.out.println("now to unlock wlock");
wlock.unlock();
System.out.println("now to unlock rlock");
rlock.unlock();
System.out.println("finished");
}

  上述代码在main函数中执行后,结果如下:

now to get wlock
now to get rlock
now to unlock wlock
now to unlock rlock
finished

  我们再看一段读锁升级的代码:

public void testRWLock_upgrade() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Lock rlock = lock.readLock();
Lock wlock = lock.writeLock();
System.out.println("now to get rlock");
rlock.lock();
System.out.println("now to get wlock");
wlock.lock();
System.out.println("now to unlock wlock");
wlock.unlock();
System.out.println("now to unlock rlock");
rlock.unlock();
System.out.println("finished");
}

  上述代码执行中,已经有了读锁,再去试图获得写锁,将会无法获得, 程序一直堵塞,进入死锁状态,显示如下:

now to get rlock
now to get wlock

  另外,CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行。
  CyclicBarrier也是一个同步的辅助类,允许一组线程相互之间等待,达到一个共同点,再继续执行。
  CountDownLatch和CyclicBarrier都是Synchronization  aid,即“同步辅助器”,既然都是辅助工具,在使用中有什么区别,各自的使用场景如何?  

CountDownLatch场景举例:一年级期末考试要开始了,监考老师发下去试卷,然后坐在讲台旁边玩着手机等待着学生答题,有的学生提前交了试卷,并约起打球了,等到最后一个学生交卷了,老师开始整理试卷,贴封条,下班,陪老婆孩子去了。
启发:CountDownLatch很像一个倒计时锁,倒计时结束,另一个线程才开始执行。就如监考老师要结束监考工作,必须等待所有学生都交了试卷,监考工作才能进入结束环节。 CyclicBarrier场景举例:公司组织户外拓展活动,帮助团队建设,其中最重要的一个项目就是要求全体员工(包括女同事,BOSS,一个都不能少)都能翻越一个高达四米,而且没有任何抓点的高墙,才能继续进行其他项目。
启发:CyclicBarrier可以看成是个障碍,所有的线程必须到齐后才能一起通过这个障碍。   
  另外,concurrent包中线程池部分,请参考我的另一篇博文《ScheduledThreadExecutor定时任务线程池》
04-27 05:14