static final class Node{ static final NodeSHARED = new Node(); static final NodeEXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Nodeprev; volatile Nodenext; volatile Thread thread; NodenextWaiter; ... }
publicfinalvoidacquire(int arg){ if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作。首先调用自定义同步器实现的tryAcquire(arg),该方法保证线程安全地获取同步状态,如果获取同步状态失败,则构造同步节点并通过addWaiter(Node node)方法加入到同步队列的尾部,最后调用acquireQueued(Node node, int arg)方法使该节点以”死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞的线程唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
1、节点构造和加入同步队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14
private NodeaddWaiter(Nodemode) { Nodenode = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Nodepred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; returnnode; } } enq(node); returnnode; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
private Nodeenq(final Nodenode) { for (;;) { Nodet = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
private static boolean shouldParkAfterFailedAcquire(Nodepred, Nodenode) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This nodehas already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails orif status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }