synchronized中wait、notify的原理与源码
1.wait和notify的流程图
2.JVM源码
java层面wait的方法
public final native void wait(long timeout) throws InterruptedException;
jvm中object.c
static JNINativeMethod methods[] = {
{"hashCode", "()I", (void *)&JVM_IHashCode},
{"wait", "(J)V", (void *)&JVM_MonitorWait},
{"notify", "()V", (void *)&JVM_MonitorNotify},
{"notifyAll", "()V", (void *)&JVM_MonitorNotifyAll},
{"clone", "()Ljava/lang/Object;", (void *)&JVM_Clone},
};
jvm层面wait方法的源码
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {//默认为false
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
}
ObjectSynchronizer::wait(obj, ms, CHECK);//主要方法是wait
JVM_END
ObjectSynchronizer::wait
// -----------------------------------------------------------------------------
// Wait/Notify/NotifyAll
// NOTE: must use heavy weight monitor to handle wait()
int ObjectSynchronizer::wait(Handle obj, jlong millis, TRAPS) {
if (UseBiasedLocking) {//使用偏向锁
BiasedLocking::revoke_and_rebias(obj, false, THREAD);//撤销偏向锁,因为第二个参数为false;该方法在synchronized的文章中已经做过介绍了
}
if (millis < 0) {//超时就抛出异常
THROW_MSG_0(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//wait方法需要先将锁膨胀到重量级锁
ObjectMonitor* monitor = ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_wait);
DTRACE_MONITOR_WAIT_PROBE(monitor, obj(), THREAD, millis);
monitor->wait(millis, true, THREAD);//插入等待队列中等待
// This dummy call is in place to get around dtrace bug 6254741. Once
// that's fixed we can uncomment the following line, remove the call
// and change this function back into a "void" func.
// DTRACE_MONITOR_PROBE(waited, monitor, obj(), THREAD);
return dtrace_waited_probe(monitor, obj, THREAD);
}
ObjectMonitor::wait
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
Thread * const Self = THREAD;//获取当前线程对象
JavaThread *jt = (JavaThread *)THREAD;//获取当前线程对应的java线程对象
EventJavaMonitorWait event;
Self->_Stalled = intptr_t(this);//设置当前线程阻塞在该 ObjectMonitor上
jt->set_current_waiting_monitor(this);
ObjectWaiter node(Self);//生成节点
node.TState = ObjectWaiter::TS_WAIT;//修改状态
Self->_ParkEvent->reset();
OrderAccess::fence(); //全屏障保证后续可以读取到最新值
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - add");//获取_WaitSetLock
AddWaiter(&node);//将等待的节点插入waitset中
Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock
_Responsible = NULL;
intptr_t save = _recursions; // record the old recursion count
_waiters++; // increment the number of waiters
_recursions = 0; // set the recursion level to be 1
exit(true, Self); // exit the monitor 退出重量级锁
int ret = OS_OK;
int WasNotified = 0;
{ // State transition wrappers
OSThread* osthread = Self->osthread();
OSThreadWaitState osts(osthread, true);//将os线程置为OBJECT_WAIT的状态
{
ThreadBlockInVM tbivm(jt);//设置当前对应的java线程是阻塞状态;_thread_blocked = 10, // blocked in vm
// Thread is in thread_blocked state and oop access is unsafe.
jt->set_suspend_equivalent();
if (interruptible && (Thread::is_interrupted(THREAD, false) || HAS_PENDING_EXCEPTION)) {//中断或者异常唤醒不做处理
// Intentionally empty
} else if (node._notified == 0) {//如果唤醒的信号==0;去阻塞
if (millis <= 0) {//如果设置的等待时间<=0
Self->_ParkEvent->park();//阻塞等待唤醒
} else {
ret = Self->_ParkEvent->park(millis);//超时等待,到时间自动醒过来
}
}
// were we externally suspended while we were waiting?
if (ExitSuspendEquivalent (jt)) {
// TODO-FIXME: add -- if succ == Self then succ = null.
jt->java_suspend_self();
}
} // Exit thread safepoint: transition _thread_blocked -> _thread_in_vm
if (node.TState == ObjectWaiter::TS_WAIT) {//如果当前状态时TS_WAIT
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - unlink");//获取_WaitSetLock锁
if (node.TState == ObjectWaiter::TS_WAIT) {
DequeueSpecificWaiter(&node); // 将当前node等待节点从waitset中摘除
node.TState = ObjectWaiter::TS_RUN;//修改状态为TS_RUN
}
Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock锁
}
OrderAccess::loadload();//使用读屏障,刷新内存中的值
OrderAccess::fence();
Self->_Stalled = 0;//将阻塞对象置为空
ObjectWaiter::TStates v = node.TState;//获取节点状态
if (v == ObjectWaiter::TS_RUN) {//如果节点状态为TS_RUN,去获取锁
enter(Self);
} else {
ReenterI(Self, &node);//尝试去获取锁
node.wait_reenter_end(this);//将线程设置为可运行状态
}
} // OSThreadWaitState()
jt->set_current_waiting_monitor(NULL);
_recursions = save; // restore the old recursion count 值还原
_waiters--; // decrement the number of waiters 等待节点减1
if (!WasNotified) {//如果不是被唤醒的
// no, it could be timeout or Thread.interrupt() or both
// check for interrupt event, otherwise it is timeout
if (interruptible && Thread::is_interrupted(Self, true) && !HAS_PENDING_EXCEPTION) {//中断产生的直接抛出异常
THROW(vmSymbols::java_lang_InterruptedException());
}
}
}
JVM_MonitorNotify和JVM_MonitorNotifyAll
JVM_ENTRY(void, JVM_MonitorNotify(JNIEnv* env, jobject handle))
JVMWrapper("JVM_MonitorNotify");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
ObjectSynchronizer::notify(obj, CHECK);//关键方法是notify
JVM_END
JVM_ENTRY(void, JVM_MonitorNotifyAll(JNIEnv* env, jobject handle))
JVMWrapper("JVM_MonitorNotifyAll");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
ObjectSynchronizer::notifyall(obj, CHECK);//关键方法是notifyAll
JVM_END
ObjectSynchronizer::notify和ObjectSynchronizer::notifyall
void ObjectSynchronizer::notify(Handle obj, TRAPS) {
if (UseBiasedLocking) {//偏向锁直接撤销
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
}
markOop mark = obj->mark();//轻量级锁直接进行替换头部
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
return;
}
//锁膨胀直接调用notify方法
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_notify)->notify(THREAD);
}
// NOTE: see comment of notify()
void ObjectSynchronizer::notifyall(Handle obj, TRAPS) {
if (UseBiasedLocking) {//偏向锁直接撤销
BiasedLocking::revoke_and_rebias(obj, false, THREAD);
}
markOop mark = obj->mark();//轻量级锁直接进行替换头部
if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
return;
}
//锁膨胀直接调用notifyAll方法
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_notify)->notifyAll(THREAD);
}
ObjectMonitor::notify和 ObjectMonitor::notifyAll
void ObjectMonitor::notify(TRAPS) {
if (_WaitSet == NULL) {//等待集合为空直接返回
return;
}
DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);
INotify(THREAD);//唤醒当前线程
OM_PERFDATA_OP(Notifications, inc(1));
}
void ObjectMonitor::notifyAll(TRAPS) {
if (_WaitSet == NULL) {//等待集合为空直接返回
return;
}
DTRACE_MONITOR_PROBE(notifyAll, this, object(), THREAD);
int tally = 0;
while (_WaitSet != NULL) {//while循环挨个唤醒
tally++;
INotify(THREAD);
}
OM_PERFDATA_OP(Notifications, inc(tally));
}
ObjectMonitor::INotify
void ObjectMonitor::INotify(Thread * Self) {
Thread::SpinAcquire(&_WaitSetLock, "WaitSet - notify");//获取_WaitSetLock锁
ObjectWaiter * iterator = DequeueWaiter();//第一个节点出队
if (iterator != NULL) {
iterator->TState = ObjectWaiter::TS_ENTER;//修改状态为TS_ENTER
iterator->_notified = 1;//标识唤醒线程数为1
iterator->_notifier_tid = JFR_THREAD_ID(Self);//记录唤醒的线程di
ObjectWaiter * list = _EntryList;//获取_EntryList执行队列
// prepend to cxq 执行队列为空,将竞争队列中的任务批量移动到_EntryList中
if (list == NULL) {
iterator->_next = iterator->_prev = NULL;
_EntryList = iterator;
} else {
iterator->TState = ObjectWaiter::TS_CXQ;//否则修改节点状态为TS_CXQ
for (;;) {//保证出队的节点插入到等待队列中
ObjectWaiter * front = _cxq;
iterator->_next = front;
if (Atomic::cmpxchg(iterator, &_cxq, front) == front) {
break;
}
}
}
iterator->wait_reenter_begin(this);
}
Thread::SpinRelease(&_WaitSetLock);//释放_WaitSetLock锁
}
4.留言
这些只是个人整理的知识点,中间可能有不到位的地方,烦请各位大佬指出