synchronized中wait、notify的原理与源码

1.wait和notify的流程图

synchronized中wait、notify的原理与源码-LMLPHP

2.JVM源码

java层面wait的方法

 public final native void wait(long timeout) throws InterruptedException;

jvm中object.c

static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

jvm层面wait方法的源码

JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
  JVMWrapper("JVM_MonitorWait");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  JavaThreadInObjectWaitState jtiows(thread, ms != 0);
  if (JvmtiExport::should_post_monitor_wait()) {//默认为false
    JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
  }
  ObjectSynchronizer::wait(obj, ms, CHECK);//主要方法是wait
JVM_END

ObjectSynchronizer::wait

// -----------------------------------------------------------------------------
//  Wait/Notify/NotifyAll
// NOTE: must use heavy weight monitor to handle wait()
int ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
  if (UseBiasedLocking) {//使用偏向锁
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);//撤销偏向锁,因为第二个参数为false;该方法在synchronized的文章中已经做过介绍了
  }
  if (millis < 0) {//超时就抛出异常
    THROW_MSG_0(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
  }
  //wait方法需要先将锁膨胀到重量级锁
  ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD,
                                                       obj(),
                                                       inflate_cause_wait);

  DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
  monitor->wait(millis, true, THREAD);//插入等待队列中等待

  // This dummy call is in place to get around dtrace bug 6254741.  Once
  // that's fixed we can uncomment the following line, remove the call
  // and change this function back into a "void" func.
  // DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
  return dtrace_waited_probe(monitor, obj, THREAD);
}

ObjectMonitor::wait

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
  Thread * const Self = THREAD;//获取当前线程对象
  JavaThread *jt = (JavaThread *)THREAD;//获取当前线程对应的java线程对象
  EventJavaMonitorWait event;

  Self->_Stalled = intptr_t(this);//设置当前线程阻塞在该 ObjectMonitor上
  jt->set_current_waiting_monitor(this);

  ObjectWaiter node(Self);//生成节点
  node.TState = ObjectWaiter::TS_WAIT;//修改状态
  Self->_ParkEvent->reset();
  OrderAccess::fence();          //全屏障保证后续可以读取到最新值

  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");//获取_WaitSetLock
  AddWaiter(&node);//将等待的节点插入waitset中
  Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock

  _Responsible = NULL;

  intptr_t save = _recursions; // record the old recursion count
  _waiters++;                  // increment the number of waiters
  _recursions = 0;             // set the recursion level to be 1
  exit(true, Self);                    // exit the monitor 退出重量级锁

  int ret = OS_OK;
  int WasNotified = 0;
  { // State transition wrappers
    OSThread* osthread = Self->osthread();
    OSThreadWaitState osts(osthread, true);//将os线程置为OBJECT_WAIT的状态
    {
      ThreadBlockInVM tbivm(jt);//设置当前对应的java线程是阻塞状态;_thread_blocked = 10, // blocked in vm
      // Thread is in thread_blocked state and oop access is unsafe.
      jt->set_suspend_equivalent();

      if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {//中断或者异常唤醒不做处理
        // Intentionally empty
      } else if (node._notified == 0) {//如果唤醒的信号==0;去阻塞
        if (millis <= 0) {//如果设置的等待时间<=0
          Self->_ParkEvent->park();//阻塞等待唤醒
        } else {
          ret = Self->_ParkEvent->park(millis);//超时等待,到时间自动醒过来
        }
      }

      // were we externally suspended while we were waiting?
      if (ExitSuspendEquivalent (jt)) {
        // TODO-FIXME: add -- if succ == Self then succ = null.
        jt->java_suspend_self();
      }

    } // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm

  
    if (node.TState == ObjectWaiter::TS_WAIT) {//如果当前状态时TS_WAIT
      Thread::SpinAcquire(&_WaitSetLock, "WaitSet - unlink");//获取_WaitSetLock锁
      if (node.TState == ObjectWaiter::TS_WAIT) {
        DequeueSpecificWaiter(&node);       // 将当前node等待节点从waitset中摘除
        node.TState = ObjectWaiter::TS_RUN;//修改状态为TS_RUN
      }
      Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock锁
    }

    OrderAccess::loadload();//使用读屏障,刷新内存中的值

    OrderAccess::fence();

    
    Self->_Stalled = 0;//将阻塞对象置为空

    ObjectWaiter::TStates v = node.TState;//获取节点状态
    if (v == ObjectWaiter::TS_RUN) {//如果节点状态为TS_RUN,去获取锁
      enter(Self);
    } else {
      ReenterI(Self, &node);//尝试去获取锁
      node.wait_reenter_end(this);//将线程设置为可运行状态
    }
  } // OSThreadWaitState()

  jt->set_current_waiting_monitor(NULL);

  _recursions = save;     // restore the old recursion count 值还原
  _waiters--;             // decrement the number of waiters 等待节点减1

  if (!WasNotified) {//如果不是被唤醒的
    // no, it could be timeout or Thread.interrupt() or both
    // check for interrupt event, otherwise it is timeout
    if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {//中断产生的直接抛出异常
      THROW(vmSymbols::java_lang_InterruptedException());
    }
  }
}

JVM_MonitorNotify和JVM_MonitorNotifyAll

JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle))
  JVMWrapper("JVM_MonitorNotify");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  ObjectSynchronizer::notify(obj, CHECK);//关键方法是notify
JVM_END


JVM_ENTRY(void, JVM_MonitorNotifyAll(JNIEnv* env, jobject handle))
  JVMWrapper("JVM_MonitorNotifyAll");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  ObjectSynchronizer::notifyall(obj, CHECK);//关键方法是notifyAll
JVM_END

ObjectSynchronizer::notify和ObjectSynchronizer::notifyall

void ObjectSynchronizer::notify(Handle obj, TRAPS) {
  if (UseBiasedLocking) {//偏向锁直接撤销
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
  }
  markOop mark = obj->mark();//轻量级锁直接进行替换头部
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    return;
  }
  //锁膨胀直接调用notify方法
  ObjectSynchronizer::inflate(THREAD,
                              obj(),
                              inflate_cause_notify)->notify(THREAD);
}

// NOTE: see comment of notify()
void ObjectSynchronizer::notifyall(Handle obj, TRAPS) {
  if (UseBiasedLocking) {//偏向锁直接撤销
    BiasedLocking::revoke_and_rebias(obj, false, THREAD);
  }
  markOop mark = obj->mark();//轻量级锁直接进行替换头部
  if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
    return;
  }
  //锁膨胀直接调用notifyAll方法
  ObjectSynchronizer::inflate(THREAD,
                              obj(),
                              inflate_cause_notify)->notifyAll(THREAD);
}


ObjectMonitor::notify和 ObjectMonitor::notifyAll

void ObjectMonitor::notify(TRAPS) {
  if (_WaitSet == NULL) {//等待集合为空直接返回
    return;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
  INotify(THREAD);//唤醒当前线程
  OM_PERFDATA_OP(Notifications, inc(1));
}

void ObjectMonitor::notifyAll(TRAPS) {
  if (_WaitSet == NULL) {//等待集合为空直接返回
    return;
  }
  DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
  int tally = 0;
  while (_WaitSet != NULL) {//while循环挨个唤醒
    tally++;
    INotify(THREAD);
  }

  OM_PERFDATA_OP(Notifications, inc(tally));
}

ObjectMonitor::INotify

void ObjectMonitor::INotify(Thread * Self) {
  Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");//获取_WaitSetLock锁
  ObjectWaiter * iterator = DequeueWaiter();//第一个节点出队
  if (iterator != NULL) {
    iterator->TState = ObjectWaiter::TS_ENTER;//修改状态为TS_ENTER

    iterator->_notified = 1;//标识唤醒线程数为1
    iterator->_notifier_tid = JFR_THREAD_ID(Self);//记录唤醒的线程di

    ObjectWaiter * list = _EntryList;//获取_EntryList执行队列

    // prepend to cxq 执行队列为空,将竞争队列中的任务批量移动到_EntryList中
    if (list == NULL) {
      iterator->_next = iterator->_prev = NULL;
      _EntryList = iterator;
    } else {
      iterator->TState = ObjectWaiter::TS_CXQ;//否则修改节点状态为TS_CXQ
      for (;;) {//保证出队的节点插入到等待队列中
        ObjectWaiter * front = _cxq;
        iterator->_next = front;
        if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
          break;
        }
      }
    }
    iterator->wait_reenter_begin(this);
  }
  Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock锁

}

4.留言

这些只是个人整理的知识点,中间可能有不到位的地方,烦请各位大佬指出

06-28 23:43