Monitor的底层实现细节

Monitor即对象头中的Monitor,每次创建对象时,JVM都会给对象分配一个对象头

JDK源码中,Monitor的实现类为ObjectMonitor

首先来看一下这个类的构造函数

ObjectMonitor() {
    _header       = NULL;
    _count        = 0;
    // 当前正在等待该monitor的线程数量
    _waiters      = 0,
    // 该monitor被重入次数,初始时为0
    _recursions   = 0;
    _object       = NULL;
    // 该monitor的拥有者,当一个线程持有该monitor时
    // monitor会将该线程标记为owner
    _owner        = NULL;
    // 等待集合,所有调用了当前monitor的wait()方法的线程都存放在这里
    _WaitSet      = NULL;
    // 简单自旋锁,用于保护等待队列
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    // 最近刚刚阻塞在monitor入口的线程
    _cxq          = NULL ;
    FreeNext      = NULL ;
    // 阻塞在monitor入口的线程集合,即想要持有该monitor的线程集合
    _EntryList    = NULL ;
    // 自旋次数
    _SpinFreq     = 0 ;
    // 自旋时间
    _SpinClock    = 0 ;
    // 当前持有该monitor的是否是线程
    OwnerIsThread = 0 ;
    // 上一个该monitor的持有者线程ID
    _previous_owner_tid = 0;
}

其中许多成员变量都是ObjectWaiter引用类型,来看一下这个类

// 它是一个双向链表,每个节点中都存放了线程对应的信息
class ObjectWaiter : public StackObj {
 public:
  enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;
  enum Sorted  { PREPEND, APPEND, SORTED } ;
  ObjectWaiter * volatile _next;
  ObjectWaiter * volatile _prev;
  Thread*       _thread;
  jlong         _notifier_tid;
  ParkEvent *   _event;
  volatile int  _notified ;
  volatile TStates TState ;
  Sorted        _Sorted ;           // List placement disposition
  bool          _active ;           // Contention monitoring is enabled
 public:
  ObjectWaiter(Thread* thread);

  void wait_reenter_begin(ObjectMonitor *mon);
  void wait_reenter_end(ObjectMonitor *mon);
};

wait方法底层实现逻辑

对于wait方法,主要关注几个点,

  • 线程如何进入等待集合
  • 线程如何释放monitor

首先来看一下,wait方法的签名

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS);
  • jlong millis:long long 类型的毫秒数,表示线程进入等待状态的时间
  • bool interruptible:线程是否可以被中断
  • TRAPS:宏,Thread* THREAD,本质上是一个线程指针

线程如何进入等待集合

// create a node to be put into the queue
ObjectWaiter node(Self);

这里创建了一个ObjectWaiter类型的node,它就是即将要进入等待集合的线程所在的节点

Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
// 该队列是一个双向的循环链表
AddWaiter (&node) ;
Thread::SpinRelease (&_WaitSetLock) ;

_WaitSetLock用于保护等待集合,通常情况下,等待集合只允许持有该monitor的线程访问,除了在park()方法因超时中断而返回的情况下,因为争抢monitor的情况会频繁发生,所以这里使用了轻量级的自旋锁来代替重量级的阻塞锁,减少了线程间切换的额外消耗

inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

这里是AddWaiter方法的实现,就是调用这个方法来将调用对象wait方法的线程加入到等待集合的

  • 首先3个assert,判断当前要加入等待集合的节点必须不为空,因为空节点的加入是没有任何意义的,并且他的上一个节点必须为空,并且它的下一个节点必须为空,因为如果它前后节点都不为空的话,表示这个节点已经存在在一个集合中了
  • 接着判断_WaitSet是否为空,如果为空,表示当前节点应该是等待集合的第一个节点,那么它自成一个等待集合
  • 如果不为空,将该节点加入到集合中,使用双向链表添加节点的操作即可

线程如何释放monitor

// exit the monitor
exit (true, Self) ;

这里的exit方法就是用于释放monitor,该方法执行完成后,其他线程就能争抢这个monitor了

if (THREAD != _owner) {
 if (THREAD->is_lock_owned((address) _owner)) {
   // Transmute _owner from a BasicLock pointer to a Thread address.
   // We don't need to hold _mutex for this transition.
   // Non-null to Non-null is safe as long as all readers can
   // tolerate either flavor.
   assert (_recursions == 0, "invariant") ;
   _owner = THREAD ;
   _recursions = 0 ;
   OwnerIsThread = 1 ;
 } else {
   // NOTE: we need to handle unbalanced monitor enter/exit
   // in native code by throwing an exception.
   // TODO: Throw an IllegalMonitorStateException ?
   TEVENT (Exit - Throw IMSX) ;
   assert(false, "Non-balanced monitor enter/exit!");
   if (false) {
      THROW(vmSymbols::java_lang_IllegalMonitorStateException());
   }
   return;
 }
}

if (_recursions != 0) {
 _recursions--;        // this is simple recursive enter
 TEVENT (Inflated exit - recursive) ;
 return ;
}
  • 该方法首先会判断当前线程是否是该monitor的owner
  • 如果不是,继续判断当前owner转换为地址并判断是否是BasicLock
    • 如果是的话将OwnerIsThread标记为1,然后执行接下来的退出操作
    • 如果不是,则抛出IllegalMonitorStateException并返回
  • 如果是的话,则继续根据_recursions来判断当前monitor是否被重入了,如果被重入了,_recursions--,否则执行退出monitor逻辑

notify方法底层实现逻辑

对于notify方法,主要关注几个点,

  • 如何选择节点(线程)来退出wait状态
  • 节点(线程)移除后,对其进行什么操作

如何选择节点(线程)来退出wait状态

Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
ObjectWaiter * iterator = DequeueWaiter() ;

与wait方法调用AddWaiter一样,在执行方法前会进行自旋操作,接着通过DequeueWaiter方法将线程从双向链表中移除

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,
  // timeout or other spurious wake-up, dequeue the
  // waiter from waiting list
  ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}
  • 首先调用了DequeueWaiter方法,判断_WaitSet是否是一个空的集合,如果是空集合,直接返回自己,否则调用DequeueSpecificWaiter方法找出第一个等待线程
  • DequeueSpecificWaiter方法中,首先进行assert,保证节点是正常节点,即不为空节点,前后指针也不为空,保证该节点还在等待集合中
  • 接着获取该节点的next指针,判断next是否是自己,如果是自己,那么再保证prev指针也是自己,表示当前_WaitSet只有一个节点,然后将其移出集合并将_WaitSet置为空
  • 如果next节点不是自己,那么将前后节点互相指向,再把自己移出集合

这里寻找被唤醒线程的方法就是找到_WaitSet中的第一个节点

节点移除后,对其进行什么操作

如果移除的节点不是空的,那么设置当前线程为该节点的唤醒线程,即将当前线程的ID设置到ObjectWaiter->_notifier_tid

然后按照一定的策略对其进行下面的操作

int Policy = Knob_MoveNotifyee ; // Knob_MoveNotifyee = 2

策略初始值为2,对应的操作为,将线程加入到_cxq的最前面

  • 将其追加到阻塞队列_EntryList的最前面或者最后面
  • 或者追加到_cxq的最前面
  • 或者追加到_cxq的最后面
  • 或者让线程直接运行

notifyAll方法底层实现逻辑

与notify功能一致,不同的是notifyAll用for循环对_WaitSet中的所有线程都执行了notify操作