Semaphore源码解读
注意,阅读本文需要了解AQS,AQS采用了模板设计模式。后续本人会完善这篇文章
Semaphore的方法
- acquire()
阻塞获得一个许可,会阻塞,直到得到一个可用许可或被中断
重载版本 acquire(n) :尝试获取n个许可 - acquireUninterruptibly()
类acquire,但不可中断 - tryAcquire()
非阻塞版本,只尝试一次,可被中断
重载版本tryAcquire(n),tryAcquire(timeout,TimeUnit) 限时尝试,没抢到许可会等待直到得到许可或被中断 。tryAcquire(n,timeout,TimeUnit) - release()
释放一个许可。重载:release(n) - drainPermits()
当前线程剩余的许可 - hasQueuedThreads()
判断是否队列中有等待许可的线程 - getQueuedLength()
等待队列的长度
Semaphore总体框架
和ReentrantLock内部类似,Semaphore内部定义了一个抽象同步器Sync抽象类,它继承了AbstractQueuedSynchronizer。Sync有两个子类:NofairSync和FairSync,分别实现了非公平和公平两种版本的实现。Semaphore的各种acquire方法都是委托给Sync对象调用的。
关于permit
Sync的构造方法。permit的本质是AQS的state
Sync(int permits) {// AQS的setState方法,setState(permits);}
Semaphore的 acquire方法
// Semaphore中
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
AQS的模板方法acquireSharedInterruptibly,调用了tr·yAcquireShared钩子方法
//AQS
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
FairSync
FairSync重写了AQS的tryAcquireShared方法
protected int tryAcquireShared(int acquires) {for (;;) {// 若有前驱,则申请失败。保证申请者为第一个节点if (hasQueuedPredecessors())return -1;// 获取当前剩余许可量int available = getState();int remaining = available - acquires;// 若剩余许可<0,或cas成功,返回剩余量。// 为什么剩余量<0也要照样返回呢?if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}
再看AQS的方法
//AQS
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}
第二个if块有两种情况:
- 钩子方法tryAcquireShared返回负数
即remaining<0,申请量大于许可量,则进入方法doAcquireSharedInterruptibly(arg)。由于钩子方法中使用短路或,remaining<0则不会cas交换 - remaining>0,cas成功,得到许可,则直接跳出方法,线程则继续下面的代码。在tryAcquireShared钩子方法中,进行过一次cas,若cas成功,返回remain,remain必然>=0,如果cas失败,则重新自旋,重复流程。
AQS的doAcquireSharedInterruptibly方法
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//进入等待队列final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 死循环,for (;;) {//获得前驱节点final Node p = node.predecessor();// 若前驱为head,则进入申请方法// 也就是说,非队首节点不能进入该申请方法if (p == head) {// 调用钩子方法int r = tryAcquireShared(arg);//r>0则成功得到许可,跳出方法if (r >= 0) {// 该方法中,将当前节点设为head,并改变状态setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//进行挂起判断if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}
private void setHeadAndPropagate(Node node, int propagate) {Node h = head; // Record old head for check belowsetHead(node);if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}
总结申请方法
Semaphore的 acquire调用了AQS的模板方法acquireSharedInterruptibly,acquireSharedInterruptibly进一步调用了钩子方法tryAcquireShared进行申请许可,在钩子方法中的死循环中,钩子方法退出循环的方式有两种:(1)remaining<0;(2)remaining<=0且cas成功。若计算出剩余许可remaining<0,则直接返回remaining,若remaining>=0,则进行CAS,若成功,返回r,该r也必然>=0。
在模板方法acquireSharedInterruptibly中,只有所需许可>可用许可时,才会进入方法doAcquireSharedInterruptibly,该方法封装线程节点入队,并且只有队首节点(前驱为head)才能够进行tryAcquireShared钩子调用,申请成功则它成为head,并且unpark后驱。
Acquire时的入队条件
- 模板的if中,调用tryAcquireShared钩子方法,返回负数(申请量大于可用量)。在FairSync中,队列中有其他节点时,会直接返回-1以进入队列
- 只要在钩子方法中,remaining>0,则会重复循环,直到成功或者r<0进入队列。
- NofairSync与FairSync相比,只是钩子方法去掉了队列中是否有结点的判断