1. 问题的引出
线程安全归根结底可以说是内存安全,在jvm内存模型中,有一块特殊的公共内存空间,称为堆内存,进程内的所有线程都可以访问并修改其中的数据,就会造成潜在的问题。因为堆内存空间在没有保护机制的情况下,你放进去的数据,可能被别的线程篡改。如下代码:
public class ThreadSafe implements Runnable { private static int count = 0; @Override public void run() { for (int i = 0; i < 1000; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允许添加线程到线程池,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } }
本来期望的值是20000,可是最终输出的结果却一点在变化,其值总是小于等于20000,显然这是由于线程不安全造成的,多个线程并发的去访问全局变量、静态变量、文件、设备、套接字等都可能出现这种问题。
2. 线程同步的措施
为了协调和配合线程之间对共享资源的访问,通常有四种方式:
1. 临界区:访问某一段临界资源的代码片段,与共享资源类似,但有一点不同的是,某一时刻只允许一个线程去访问(对应java中的关键字 synchronized包含的代码)。
2. 互斥量:互斥量是一个对象,只有都拥有互斥量的对象才可以访问共享资源。而且互斥量中只有一个,通常互斥量的实现是通过锁来实现的,而且加锁操作和释放操作只能由同一个线程来完成。此处与临界区的区别是一段代码,通常存在与一个文件中,而互斥量是一个对象,加锁操作和解锁操作可以在不同的文件去编写,只要是同一个线程就好。
3. 信号量: 信号量可以允许指定数量的线程同一时刻去访问共享资源,当线程数达到了阈值后,将阻止其他线程的访问,最常见的比如生产者和消费者问题。信号量和互斥量的区别则是信号量的发出和释放可以由不同线程来完成。
4. 事件:通过发送通知的形式来实现线程同步,可以实现不同进程中的线程同步操作。
3.饥饿与死锁
饥饿:某些线程或进程由于长期得不到资源,而总是处于就绪或者阻塞状态。例如:
①. 该进程或线程所拥有的CPU时间片被其他线程抢占而得不到执行(通常是优先级比它高的线程或进程),一直处于就绪状态。
②. 由于选用不恰当的调度算法,导致该进程或线程长期无法得到CPU时间片,处于就绪状态。
③. 由于唤醒的时间把握不对,唤醒线程时,所需的资源处于被锁定状态,导致线程回到阻塞状态。
死锁:两个或多个线程在执行过程中,由于相互占有对方所需的资源而又互不相让从而造成这些线程都被阻塞,若无外力的作用下,他们都将无法执行下去。例如
①. 进程推进顺序不合适。互相占有彼此需要的资源,同时请求对方占有的资源,形成循环依赖的关系。
②. 资源不足。
③. 进程运行推进顺序与速度不同,也可能产生死锁。
一些避免死锁的措施:
不要在锁区域内在加把锁,即不要在释放锁之前竞争其他锁。
减小锁粒度,即减小线程加锁的范围,真正需要的时候再去加锁。
顺序访问共享资源。
设置超时机制,超过指定时间则程序返回错误。
竞争锁期间,允许程序被中断。
4.代码层面解决线程安全
解决线程安全主要考虑三方面:
通常原子性都可以得到保证,问题的病端就出在可见性和原子性。
4.1 可见性的问题
如下实例程序,按通常的理解来说,当主线程等待一秒后,把flag的值修改为true后,另外一个线程应该可以感知到,然后跳过while循环,直接打印出后面的数据,可是最结果却一直卡在了while循环里。
public class Thread4 implements Runnable{ private static boolean flag = false; @Override public void run() { System.out.println("waiting for data...."); while (!flag); System.out.println("cpying with data"); } public static void main(String[] args) throws InterruptedException { Thread4 thread4 = new Thread4(); Thread t = new Thread(thread4); t.start(); Thread.sleep(1000); flag = true; } } /* output * waiting for data.... */
主要的原因是java程序在jvm上运行的时候,该程序所占用的内存分为两类主内存和工作内存(逻辑上的内存,实际上是cpu的寄存器和高速缓存,因为,cpu在计算的时候,并不总是从内存读取数据,它的数据读取顺序优先级是:寄存器-高速缓存-内存。CPU和内存之间通过总线进行)。也就是在主线程中启动另一个线程t会开辟出一个新的工作内存,与主线程的工作内存相互独立,且线程之间无法直接通信,只能去主内存读取全局变量,而线程t中做while判断的时候并不会去读取主内存的flag值,致使线程t无法被感知到flag在其他线程被改变,可以做一个测试,现在把run函数改成如下形式:
public class Thread4 implements Runnable{ private volatile static boolean flag = false; @Override private volatile static boolean flag = false; public void run() { System.out.println("waiting for data...."); while (!flag); System.out.println("cpying with data"); } //.... } /* output * waiting for data.... * false * ... * false * cpying with data */
为了感知其他线程中一些全局变量值的变化,而且避免频繁去测试主内存中的数据变化,保证线程之间的可见性,可以使用volatile关键字去修饰全局变量,如下:
public void run() { System.out.println("waiting for data...."); /* Notice 如果在while循环里加上System.out.println(flag);语句,则不会使用本工作内存的flag数据, 而是重新去主内存加载数据 */ while (!flag){ System.out.println(flag); //测试,可以做到线程的可见性 } System.out.println("cpying with data"); } /* output * waiting for data.... * false * ... * false * cpying with data */
volatile关键字借助MESI一致性协议,会在工作内存(CPU的寄存器等)与主内存连接的总线上建立一道总线嗅探机制,一旦发现其他线程修改了主内存中的某个全局变量(即图中橙灰色线条读取的数据以及写回的数据),就会让其他工作线程中从主内存拷贝出来的副本变量失效(即图中紫色的线条读取的数据),从而会使左边的线程重新去读取数据(即图中红色的线条读取的数据)。如下图:
虽然解决了原子性问题,可是volatile关键字不支持原子性操作,如下程序:
public class Thread5 implements Runnable { private static volatile int count = 0; @Override public void run() { for (int i = 0; i < 10000; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new Thread5()); } es.shutdown(); //不允许添加线程,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } } /* output * 175630 */
4.2 原子性问题
针对原子性问题,我们可以使用熟悉的synchronized关键字,synchronized关键字最主要有以下3种应用方式:
修饰实例方法,作用于当前实例加锁,进入同步代码前要获得当前实例的锁
修饰静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁
修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。
部分示例代码如下:
public class Thread5 implements Runnable { private static int count = 0; public synchronized static void add() { count++; } @Override public void run() { for (int i = 0; i < 1000000; i++) { // add() synchronized (Thread5.class){ count++; } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new Thread5()); } es.shutdown(); //不允许添加线程,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } } /* output * 20000000 */
然而synchronized是一种悲观锁,具有强烈的独占和排他特性,它频繁的加锁和释放锁操作会使程序的效率低下。与悲观锁相对是一种乐观锁操作CAS(CompareAndSwap),乐观锁就是每次去取数据的时候都乐观的认为数据不会被修改,因此这个过程不会上锁,但是在更新的时候会判断一下在此期间的数据有没有更新,如果没有更新则去修改,否则失败。可是上面这种 操作会出现ABA(A-B-A,中途被改变,但最后又改回原值)的问题,
针对上面的问题,java中可以使用Atomic,它的包名为java.util.concurrent.atomic
。这个包里面提供了一组原子变量的操作类(通过值加版本号的方式去解决ABA问题),这些类可以保证在多线程环境下,当某个线程在执行atomic的方法时,不会被其他线程打断,一直等到该方法执行完成(具体的API文档可以查看参考文献第5点)。
public class ThreadSafe implements Runnable { private static AtomicInteger count = new AtomicInteger(0); @Override public void run() { for (int i = 0; i < 10000; i++) { count.getAndAdd(1); } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允许添加线程,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } } /* output * 200000 */
5. 其他方法解决线程同步
a. 自旋锁
线程循环反复检查锁变量是否可用,在这一过程中线程一直保持执行(RUNNABLE),因此是一种忙等待,不像关键字synchronized一样,一旦发现不能访问,则处于线程处于阻塞状态(BLOCKED)。
public class Thread6 implements Runnable{ private static final Lock lock = new ReentrantLock(); private volatile static int count = 0; @Override public void run() { for (int i = 0; i < 1000000; i++){ lock.lock(); count++; lock.unlock(); } } static void test(ExecutorService es) throws InterruptedException { for (int i = 0; i < 20; i++) { es.execute(new Thread6()); } es.shutdown(); //不允许添加线程,异步关闭连接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成 System.out.println(count); } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(20); test(es); } }
如果在使用lock的时候包含了try...catch...语句,要注意的是lock 必须在 finally 块中释放。否则,如果受保护的代码将抛出异常,锁就有可能永远得不到释放!
与Lock类同一个包java.util.concurrent.locks
下还有一种读写分离的锁ReentrantReadWriteLock类,读写锁维护了一对锁,一个读锁和一个写锁。一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。
public class RWTest { private static final Map<String, Object> map = new HashMap<String, Object>(); private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static final Lock readLock = lock.readLock(); private static final Lock writeLock = lock.writeLock(); public static final Object get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public static final Object put(String key, Object value) { writeLock.lock(); try { return map.put(key, value); } finally { writeLock.unlock(); } } public static final void clear() { writeLock.lock(); try { map.clear(); } finally { writeLock.unlock(); } } }
利用自旋锁Lock也提供了Condition
来实现线程间的状态通知的,可以根据实际情况去唤醒某个线程(与后面的wait不同,是随机的)或者所有线程。可以通过lock.newCondition()
来获取得Condition实例,可以根据实际需求创建多个实例。
public class Thread9 { public static ReentrantLock lock=new ReentrantLock(); public static Condition condition =lock.newCondition(); public static void main(String[] args) { new Thread(){ @Override public void run() { lock.lock();//请求锁 try{ System.out.println(Thread.currentThread().getName()+"==》进入等待"); condition.await();//设置当前线程进入等待 }catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//释放锁 } System.out.println(Thread.currentThread().getName()+"==》继续执行"); } }.start(); new Thread(){ @Override public void run() { lock.lock();//请求锁 try{ System.out.println(Thread.currentThread().getName()+"=》进入"); Thread.sleep(2000);//休息2秒 condition.signal();//随机唤醒等待队列中的一个线程 System.out.println(Thread.currentThread().getName()+"休息结束"); }catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//释放锁 } } }.start(); } } /*output *Thread-0==》进入等待 *Thread-1=》进入 *Thread-1休息结束 *Thread-0==》继续执行 */
b. wait.notify.notifyAll
在关键字synchronized的线程同步机制,调用线程的sleep,yield方法时,线程并不会让出对象锁,但是调用wait却不同,线程自动释放其占有的对象锁,同时不会去申请对象锁,当线程被唤醒的时候,它才再次去申请竞争对象的锁(该关键字通常只与synchronized结合使用)。notify()唤醒在等待该对象同步锁的线程(只唤醒一个,如果有多个在等待),注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。而notifyAll()则是唤醒所有等待的线程。
public class Thread8 implements Runnable { private int num; private Object lock; public Thread8(int num, Object lock) { this.num = num; this.lock = lock; } public void run() { try { while (true) { synchronized (lock) { lock.notifyAll(); lock.wait(); System.out.println(num); } } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { final Object lock = new Object(); Thread thread1 = new Thread(new Thread8(1, lock)); Thread thread2 = new Thread(new Thread8(2, lock)); thread1.start(); thread2.start(); } } /* output * 交替输出1,2,1,2,1,2...... */
6.并发编程—CountDownLatch、CyclicBarrier、Semaphore和fork/join框架
1. CountDownLatch
CountDownLatch实现的是一个倒序计数器,可以通过调用它的countDown实现计数器减一和await方法来阻塞当前线程:
public class Thread10 { public static void main(String[] args) throws InterruptedException { int count = 20; final CountDownLatch cdl = new CountDownLatch(count); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < count; i++) { es.execute(new Runnable() { @Override public void run() { try { System.out.println(cdl.getCount()); }finally { cdl.countDown(); } } }); } cdl.await(); es.shutdown(); System.out.println("主线程现在才结束: count = "+cdl.getCount()); } }
2.CyclicBarrier
即回环栅栏,是一种可重用的线程阻塞器,它将率先到达栅栏的这些线程阻塞(调用await()方法),直到指定数量的线程都到达该处,这些线程将会被全部释放。
public class Thread11 implements Runnable{ private int num; private static CyclicBarrier cb = new CyclicBarrier(6); //指定栅栏的等待线程数 public Thread11(int num){ this.num = num; } @Override public void run() { try { Thread.sleep(1000*num); //等待指定数量时间后到达栅栏处 System.out.println(Thread.currentThread().getName() +" is coming.."); cb.await(10L, TimeUnit.SECONDS); System.out.println("continue...."); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 8; i++) { es.execute(new Thread11(i)); } es.shutdown(); } } /* *pool-1-thread-1 is coming.. *pool-1-thread-2 is coming.. *pool-1-thread-3 is coming.. *continue.... *continue.... *continue.... *pool-1-thread-4 is coming.. *超时异常错误(指定时间内线程数量仍然到达) */
3.Semaphore信号量
信号量用于保护对一个或多个共享资源的访问,其内部维护一个计数器,用来只是当前可以访问共享资源的数量。可以通过tryAcquire去尝试获取许可,还可以通过availablePermits()方法得到可用的许可数目,而acquire/release则是获取/释放许可。
public class Thread12 implements Runnable { private static SecureRandom random= new SecureRandom(); private static Semaphore semaphore = new Semaphore(3, true); @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " got permission..."); Thread.sleep(random.nextInt(10000)); semaphore.release(); System.out.println(Thread.currentThread().getName() + " released permission..."); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 6; i++) { es.execute(new Thread12()); } es.shutdown(); } }
4.fork/join框架
Fork/Join框架提供了的一个用于并行执行任务的框架,充分利用了CPU资源,把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。(只供Java7使用)
ForkJoinPool与其他类型的ExecutorService的不同之处主要在于使用工作窃取,每个线程都有自己的双端任务队列,线程在一般情况下会从队列头去获取任务,当某个线程任务队列的为空的时候,它会尝试从其他线程的任务队列的尾部去“窃取”任务来执行。
public class Thread13 extends RecursiveTask<Integer> { private int start; private int end; public Thread13(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int m = 1000; //每个线程计算的范围大小 int s = start, n = end; //每个线程计算的起始地址 int r = 0; //算和的变量 List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>(); while (s <= n) { if (n - s < m) { for (int i = s; i <= n; i++) { r += i; } } else { n = Math.min(s + m - 1, n); //得到一个新的start it.add(new Thread13(s, n).fork()); //得到每一个范围[如(0,999)]加入一个线程 } s = n + 1; n = end; } for (ForkJoinTask<Integer> t : it) { r += t.join(); } return r; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool fjp = new ForkJoinPool(); int s = 1, n = 10001; Future<Integer> rs = fjp.submit(new Thread13(s, n)); System.out.println(rs.get()); } } /* output * 50015001 */