四、同步器的实现(四)
4.Semaphore
Semaphore是一个信号计数量,用于维护一个资源的使用许可量,如果维护的资源使用许可超出会阻塞每一个请求该资源的许可(acquire),直到又有可用的。Semaphore通常用于限制访问资源的线程数目,下面是一个使用示例:
/*** Semaphore使用示例,Semaphore代表厕所坑位* Created by bzhang on 2019/3/21.*/public class TestSemaphore { private Semaphore lock = new Semaphore(3); //厕所坑位 public void goToToilet(){ try { lock.acquire(); //尝试获取一个位置 System.out.println(Thread.currentThread().getName()+"正在使用厕所"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName()+"上完厕所,很开心"); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.release(); //释放一个位置 } } public static void main(String[] args) { TestSemaphore test = new TestSemaphore(); ExecutorService pool = Executors.newCachedThreadPool(); //缓存线程池 for (int i = 0;i < 11;i++){ pool.submit(new Runnable() { @Override public void run() { test.goToToilet(); } }); } pool.shutdown(); //关闭线程池 }}
Semaphore的使用与ReentrantLock和synchronized的使用很相似,只是锁的使用一般都是独占的,即一次只允许一个线程运行。而Semaphore则是许可多个线程进行访问,当Semaphore只许可一个线程访问时,也就退化成锁。
Semaphore的底层也是由AQS同步器实现的:
public class Semaphore implements java.io.Serializable { //AQS同步的队列引用 private final Sync sync; //由构造方法可以看出Semaphore也分为公平模式和非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } //根据fair创建公平同步队列或是非公平同步队列 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }}
Semaphore的继承关系如下图,是由三个内部类Sync,NonfairSync与FairSync共同来实现Semaphore的信号计数功能。
下面先来看公平与非公平模式的实现源码:
//非公平模式的实现static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } //重写的实AQS中的tryAcquireShared方法,说明使用的是共享模式 //尝试获取锁 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); //调用父类的非公平模式下尝试获取锁方法 }}//公平模式的实现static final class FairSync extends Sync { FairSync(int permits) { super(permits); } //尝试获取锁 protected int tryAcquireShared(int acquires) { for (;;) { //自旋 //同步队列中是否存在排在当前线程前面的线程结点 //也就是说当前线程是不是等待最久的线程,不是的话,直接不让获取锁 if (hasQueuedPredecessors()) return -1; int available = getState(); //获取同步状态 int remaining = available - acquires; //将要更新的同步状态值 //remaining小于0,说明没有可用的资源了,返回获取失败 //remaining大于等于0且CAS方式更新状态失败,则表示有可用资源,继续循环尝试更新直到成功 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }}//公平锁与非公平锁父类abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } //获取剩余许可资源数 final int getPermits() { return getState(); } //非公平尝试获取锁,与公平模式基本相似,只是不需要判断当前线程是不是等待最久的线程 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //释放锁资源 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) //超出最大许可值,抛异常 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //减少许可数,即假设当前许可数是5,调用该方法会时许可数变为5-reductions final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //获取可立即使用的许可数 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }}
了解了底层实现,再看看Semaphore获取许可和释放许可的过程:
//获取一个许可,在提供一个许可前一直将线程阻塞,或线程被中断public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}//AQS中的方法,可被中断的获取锁的方法public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //tryAcquireShared调用的是公平锁FairSync或NonfairSync中的重写方法 //tryAcquireShared可能失败,进入doAcquireSharedInterruptibly方法进行自旋或加入同步队列 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}//尝试获取一个许可,在有可用的许可前将其阻塞,且不可被中断public void acquireUninterruptibly() { sync.acquireShared(1);}//AQS中的方法public final void acquireShared(int arg) { //尝试获取许可失败的话,会进行自旋或加入到同步队列中等待获取锁 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); //AQS中已分析过,不再深入}//尝试获取一个许可public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; //只进行一次方式,失败后直接返回+}//在一定时间内尝试获取许可public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}//AQS中的在一定时间内尝试获取锁的方法public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //本质上还是先调用tryAcquireShared方法,不成功再在一定时间去自旋或加入同步队列等待获取许可 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);}//获取多个许可,在提供permits许可前一直将线程阻塞,或线程被中断public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits);}//获取给定数目的许可,在提供这些许可前一直将线程阻塞,不可被中断public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits);}//尝试获取给定数目的许可,值尝试一次,失败直接返回public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0;}//在给定的时间内尝试获取给定数目的许可数,超时返回失败public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}//释放许可的过程//释放一个许可public void release() { sync.releaseShared(1);}//AQS中的方法public final boolean releaseShared(int arg) { //尝试释放一个许可,不成功则进入doReleaseShared继续尝试释放许可 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}//释放给定数目的许可public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits);}