Monitor的底层实现细节
Monitor即对象头中的Monitor,每次创建对象时,JVM都会给对象分配一个对象头
JDK源码中,Monitor的实现类为ObjectMonitor
首先来看一下这个类的构造函数
ObjectMonitor() {
_header = NULL;
_count = 0;
// 当前正在等待该monitor的线程数量
_waiters = 0,
// 该monitor被重入次数,初始时为0
_recursions = 0;
_object = NULL;
// 该monitor的拥有者,当一个线程持有该monitor时
// monitor会将该线程标记为owner
_owner = NULL;
// 等待集合,所有调用了当前monitor的wait()方法的线程都存放在这里
_WaitSet = NULL;
// 简单自旋锁,用于保护等待队列
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
// 最近刚刚阻塞在monitor入口的线程
_cxq = NULL ;
FreeNext = NULL ;
// 阻塞在monitor入口的线程集合,即想要持有该monitor的线程集合
_EntryList = NULL ;
// 自旋次数
_SpinFreq = 0 ;
// 自旋时间
_SpinClock = 0 ;
// 当前持有该monitor的是否是线程
OwnerIsThread = 0 ;
// 上一个该monitor的持有者线程ID
_previous_owner_tid = 0;
}
其中许多成员变量都是ObjectWaiter引用类型,来看一下这个类
// 它是一个双向链表,每个节点中都存放了线程对应的信息
class ObjectWaiter : public StackObj {
public:
enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
enum Sorted { PREPEND, APPEND, SORTED } ;
ObjectWaiter * volatile _next;
ObjectWaiter * volatile _prev;
Thread* _thread;
jlong _notifier_tid;
ParkEvent * _event;
volatile int _notified ;
volatile TStates TState ;
Sorted _Sorted ; // List placement disposition
bool _active ; // Contention monitoring is enabled
public:
ObjectWaiter(Thread* thread);
void wait_reenter_begin(ObjectMonitor *mon);
void wait_reenter_end(ObjectMonitor *mon);
};
wait方法底层实现逻辑
对于wait方法,主要关注几个点,
- 线程如何进入等待集合
- 线程如何释放monitor
首先来看一下,wait方法的签名
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS);
- jlong millis:long long 类型的毫秒数,表示线程进入等待状态的时间
- bool interruptible:线程是否可以被中断
- TRAPS:宏,Thread* THREAD,本质上是一个线程指针
线程如何进入等待集合
// create a node to be put into the queue
ObjectWaiter node(Self);
这里创建了一个ObjectWaiter类型的node,它就是即将要进入等待集合的线程所在的节点
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 该队列是一个双向的循环链表
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;
_WaitSetLock用于保护等待集合,通常情况下,等待集合只允许持有该monitor的线程访问,除了在park()方法因超时中断而返回的情况下,因为争抢monitor的情况会频繁发生,所以这里使用了轻量级的自旋锁来代替重量级的阻塞锁,减少了线程间切换的额外消耗
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev == NULL, "node already in list");
assert(node->_next == NULL, "node already in list");
// put node at end of queue (circular doubly linked list)
if (_WaitSet == NULL) {
_WaitSet = node;
node->_prev = node;
node->_next = node;
} else {
ObjectWaiter* head = _WaitSet ;
ObjectWaiter* tail = head->_prev;
assert(tail->_next == head, "invariant check");
tail->_next = node;
head->_prev = node;
node->_next = head;
node->_prev = tail;
}
}
这里是AddWaiter方法的实现,就是调用这个方法来将调用对象wait方法的线程加入到等待集合的
- 首先3个assert,判断当前要加入等待集合的节点必须不为空,因为空节点的加入是没有任何意义的,并且他的上一个节点必须为空,并且它的下一个节点必须为空,因为如果它前后节点都不为空的话,表示这个节点已经存在在一个集合中了
- 接着判断_WaitSet是否为空,如果为空,表示当前节点应该是等待集合的第一个节点,那么它自成一个等待集合
- 如果不为空,将该节点加入到集合中,使用双向链表添加节点的操作即可
线程如何释放monitor
// exit the monitor
exit (true, Self) ;
这里的exit方法就是用于释放monitor,该方法执行完成后,其他线程就能争抢这个monitor了
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
// Transmute _owner from a BasicLock pointer to a Thread address.
// We don't need to hold _mutex for this transition.
// Non-null to Non-null is safe as long as all readers can
// tolerate either flavor.
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
- 该方法首先会判断当前线程是否是该monitor的owner
- 如果不是,继续判断当前owner转换为地址并判断是否是BasicLock
- 如果是的话将
OwnerIsThread
标记为1,然后执行接下来的退出操作 - 如果不是,则抛出
IllegalMonitorStateException
并返回
- 如果是的话将
- 如果是的话,则继续根据_recursions来判断当前monitor是否被重入了,如果被重入了,_recursions--,否则执行退出monitor逻辑
notify方法底层实现逻辑
对于notify方法,主要关注几个点,
- 如何选择节点(线程)来退出wait状态
- 节点(线程)移除后,对其进行什么操作
如何选择节点(线程)来退出wait状态
Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;
与wait方法调用AddWaiter一样,在执行方法前会进行自旋操作,接着通过DequeueWaiter
方法将线程从双向链表中移除
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
// dequeue the very first waiter
ObjectWaiter* waiter = _WaitSet;
if (waiter) {
DequeueSpecificWaiter(waiter);
}
return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
assert(node != NULL, "should not dequeue NULL node");
assert(node->_prev != NULL, "node already removed from list");
assert(node->_next != NULL, "node already removed from list");
// when the waiter has woken up because of interrupt,
// timeout or other spurious wake-up, dequeue the
// waiter from waiting list
ObjectWaiter* next = node->_next;
if (next == node) {
assert(node->_prev == node, "invariant check");
_WaitSet = NULL;
} else {
ObjectWaiter* prev = node->_prev;
assert(prev->_next == node, "invariant check");
assert(next->_prev == node, "invariant check");
next->_prev = prev;
prev->_next = next;
if (_WaitSet == node) {
_WaitSet = next;
}
}
node->_next = NULL;
node->_prev = NULL;
}
- 首先调用了
DequeueWaiter
方法,判断_WaitSet是否是一个空的集合,如果是空集合,直接返回自己,否则调用DequeueSpecificWaiter
方法找出第一个等待线程 - 在
DequeueSpecificWaiter
方法中,首先进行assert,保证节点是正常节点,即不为空节点,前后指针也不为空,保证该节点还在等待集合中 - 接着获取该节点的next指针,判断next是否是自己,如果是自己,那么再保证prev指针也是自己,表示当前_WaitSet只有一个节点,然后将其移出集合并将_WaitSet置为空
- 如果next节点不是自己,那么将前后节点互相指向,再把自己移出集合
这里寻找被唤醒线程的方法就是找到_WaitSet中的第一个节点
节点移除后,对其进行什么操作
如果移除的节点不是空的,那么设置当前线程为该节点的唤醒线程,即将当前线程的ID设置到ObjectWaiter->_notifier_tid
上
然后按照一定的策略对其进行下面的操作
int Policy = Knob_MoveNotifyee ; // Knob_MoveNotifyee = 2
策略初始值为2,对应的操作为,将线程加入到_cxq的最前面
- 将其追加到阻塞队列
_EntryList
的最前面或者最后面 - 或者追加到
_cxq
的最前面 - 或者追加到
_cxq
的最后面 - 或者让线程直接运行
notifyAll方法底层实现逻辑
与notify功能一致,不同的是notifyAll用for循环对_WaitSet中的所有线程都执行了notify操作