互斥

互斥访问是并发编程要解决的核心问题之一。

有许多种方法可以满足临界区的互斥访问。大体上可以分为三种,

一种是软件方法,即由用户程序承担互斥访问的责任,而不需要依赖编程语言或操作系统,譬如Dekker算法、Peterson算法等,通常这种方式会有一定的性能开销和编程难度。

第二种是操作系统或编程语言对互斥的原生支持,譬如Linux中的mutex、Java语言的synchronized。

最后是硬件上的特殊指令,譬如著名的CAS。这种方式开销最少,但是很难成为一种通用的解决方案,通常操作系统或编程语言的互斥是基于此建立起来的。

管程-Monitor

管程属于编程语言级别的互斥解决方案,最早是Brinch Hanson和Hoare于1970s提出的概念,已在Pascal、Java、Python等语言中得到了实现。

“管程”一词翻译自英文Monitor Procedures,字面理解就是管理一个或多个执行过程。(但是个人感觉“管程”这个翻译有点莫名其妙,看完更迷糊了,所以本文坚持用回原名Monitor。)

Monitor本质上是对通用同步工具的一种抽象,它就像一个线程安全的盒子,用户程序把一个方法或过程(代码块)放进去,它就可以为他们提供一种保障:同一时刻只能有一个进程/线程执行该方法或过程,从而简化了并发应用的开发难度

如果Monitor内没有线程正在执行,则线程可以进入Monitor执行方法,否则该线程被放入入口队列(entry queue)并使其挂起。当有线程从Monitor中退出时,会唤醒entry queue中的一个线程。

为了处理并发线程,Monitor还需要一个更基础的同步工具,或者说需要一个机制,使得线程不仅被挂起,而且还能释放Monitor,以便其他线程可以进入。

Monitor使用条件变量(Condition Variable)支持这种机制,这些条件变量(一个或多个)包含在Monitor中,并且只有在Monitor内才能被访问 (类似Java对象的private变量)。

对外开放两个方法以便用户程序操作条件变量

cwait(c):调用该方法的线程在条件c上阻塞,monitor现在可以被其他线程使用。

csignal(c):恢复在条件c上被阻塞的线程。若有多个这样的线程,选择其中一个。

(通常,为了保证cwait/csignal对条件变量的变更是原子性的,还需要借助CAS)

当线程等待资源时

当Monitor中正在执行的线程无法获取所需资源时,情况会变得更加复杂。

如果发生这种情况,等待资源的线程可以先把自己挂起,并且释放Monitor的使用权,使得其他线程得以进入Monitor。

那么问题来了,当第二个线程在执行期间,第一个线程所需的资源可用了,会发生什么?

立即唤醒第一个线程,还是第二个线程先执行完?

对此产生了多个对Monitor的定义。

Hoare版本

在Hoare的语义中,当资源可用时,ThreadA立即恢复执行,而ThreadB进入signal queue。

1.ThreadA 进入 monitor
2.ThreadA 等待资源 (进入wait queue)
3.ThreadB 进入monitor
4.ThreadB 资源可用 ,通知ThreadA恢复执行,并把自己转移到signal queue。
5.ThreadA 重新进入 monitor
6.ThreadA 离开monitor
7.ThreadB 重新进入 monitor
8.ThreadB 离开monitor
9.其他在entry queue中的线程通过竞争进入monitor

Mesa版本

在Mesa Monitor的实现中,第二个线程会先执行完。

ThreadA的资源可用时,把它从wait queue转移到entry queue。ThreadB继续执行至结束。

ThreadA最终也会从entry queue中得以执行。

1.ThreadA 进入 monitor
2.ThreadA 等待资源 (进入wait queue,并释放monitor)
3.ThreadB 进入monitor
4.ThreadB 资源可用,通知ThreadA。(ThreadA被转移到entey queue)
5.ThreadB 继续执行
6.ThreadB 离开monitor
7.ThreadA 获得执行机会,从entry queue出队列,恢复执行
8.ThreadA 离开monitor
9.其他在entry queue中的线程通过竞争进入monitor

由于ThreadA被转移到了entry queue,当ThreadB退出monitor后,ThreadA与其他线程平等竞争monitor的进入条件,所以并不能保证立即执行。

更不幸的是,等到ThreadA重入monitor后,资源可能再次不可用,重复以上过程。

Brinch Hanson版本

Brinch Hanson Monitor(以下简称BH Monitor)只允许线程从monitor退出时发出信号,此时被通知的线程进入monitor恢复执行。

1.ThreadA 进入 monitor
2.ThreadA 等待资源a
3.ThreadB 进入monitor
4.ThreadB 离开Monitor,并给通知等待资源a的线程,资源可用
5.ThreadA 重新进入 monitor
6.ThreadA 离开monitor
7.其他线程从entry queue中竞争进入monitor

三种语义对比

Hoare Monitor中,资源可用时,ThreadB调用csignal()后被阻塞,以便ThreadA立即恢复执行。

这时ThreadB应该被放到哪里?一种可能是转移到entry queue,这样它就必须与其他还未进入Montior的线程平等竞争获取重入机会。

但是由于在调用csignal()之前,ThreadB已经执行了一部分,因此使它优先于其他线程是有意义的,

为此,Hoare Monitor增加了signal queue用于存放阻塞在csignal()上的线程。

Hoare Monitor的一个明显缺点是,ThreadB在执行中途被中断,需要额外的两次线程切换才能恢复执行。

不同的是,Mesa Monitor和BH Monitor会保证ThreadB先执行完,因此不需要额外的signal queue。

Java版本的Monitor

Java在实现时对最初的Monitor定义做了一些合理的限制。首先,与以上三种都不一样的是,Java Montior只允许一个条件变量,而不是多个。

不像BH monitor,signal可以出现在代码的任何地方。

也不像Hoare monitor,资源可以时,被通知的线程不会立即执行,而是从BLOCK状态变成RUNNABLE状态,被CPU再次调度到时才恢复执行。

与cwait(c)和csignal(c)对应的是wait()和notify()方法。

Java monitor机制通过synchronized关键字暴露给用户,syncronized可以用户修饰方法或代码块,两者本质上都是一个执行过程。

Java monitor实现生产者/消费者



//简化版本,只允许一个生产者和一个消费者

class BoundedBuffer {

   private int numSlots = 0;

   private double[] buffer = null;

   private int putIn = 0, takeOut = 0;

   private int count = 0;

   public BoundedBuffer(int numSlots) {

      if (numSlots <= 0) throw new IllegalArgumentException("numSlots<=0");

      this.numSlots = numSlots;

      buffer = new double[numSlots];

      System.out.println("BoundedBuffer alive, numSlots=" + numSlots);

   }

   public synchronized void deposit(double value) {

      while (count == numSlots)

         try {

            wait();

         } catch (InterruptedException e) {

            System.err.println("interrupted out of wait");

         }

      buffer[putIn] = value;

      putIn = (putIn + 1) % numSlots;

      count++;

      if (count == 1) notify();  //唤醒等待的consumer

   }

   public synchronized double fetch() {

      double value;

      while (count == 0)

         try {

            wait();

         } catch (InterruptedException e) {

            System.err.println("interrupted out of wait");

         }

      value = buffer[takeOut];

      takeOut = (takeOut + 1) % numSlots;

      count--;                           // wake up the producer

      if (count == numSlots-1) notify(); // 唤醒等待的producer

      return value;

   }

}

05-10 19:35