博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
高并发学习笔记(八)
阅读量:6817 次
发布时间:2019-06-26

本文共 7096 字,大约阅读时间需要 23 分钟。

hot3.png

四、同步器的实现(四)

4.Semaphore

    Semaphore是一个信号计数量,用于维护一个资源的使用许可量,如果维护的资源使用许可超出会阻塞每一个请求该资源的许可(acquire),直到又有可用的。Semaphore通常用于限制访问资源的线程数目,下面是一个使用示例:

/*** Semaphore使用示例,Semaphore代表厕所坑位* Created by bzhang on 2019/3/21.*/public class TestSemaphore {      private Semaphore lock = new Semaphore(3);      //厕所坑位      public void goToToilet(){            try {                  lock.acquire();   //尝试获取一个位置                  System.out.println(Thread.currentThread().getName()+"正在使用厕所");                  TimeUnit.SECONDS.sleep(2);                  System.out.println(Thread.currentThread().getName()+"上完厕所,很开心");            } catch (InterruptedException e) {                  e.printStackTrace();            }finally {                  lock.release();   //释放一个位置            }      }      public static void main(String[] args) {            TestSemaphore test = new TestSemaphore();            ExecutorService pool = Executors.newCachedThreadPool();     //缓存线程池            for (int i = 0;i < 11;i++){                  pool.submit(new Runnable() {                        @Override                        public void run() {                              test.goToToilet();                        }                  });            }            pool.shutdown();    //关闭线程池      }}

    Semaphore的使用与ReentrantLock和synchronized的使用很相似,只是锁的使用一般都是独占的,即一次只允许一个线程运行。而Semaphore则是许可多个线程进行访问,当Semaphore只许可一个线程访问时,也就退化成锁。

    Semaphore的底层也是由AQS同步器实现的:

public class Semaphore implements java.io.Serializable {        //AQS同步的队列引用    private final Sync sync;         //由构造方法可以看出Semaphore也分为公平模式和非公平模式    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    //根据fair创建公平同步队列或是非公平同步队列    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }}

 

    Semaphore的继承关系如下图,是由三个内部类Sync,NonfairSync与FairSync共同来实现Semaphore的信号计数功能。

3566422e4859127142520d23fc02daf6ab5.jpg

    下面先来看公平与非公平模式的实现源码:

//非公平模式的实现static final class NonfairSync extends Sync {    NonfairSync(int permits) {        super(permits);    }    //重写的实AQS中的tryAcquireShared方法,说明使用的是共享模式    //尝试获取锁    protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);    //调用父类的非公平模式下尝试获取锁方法    }}//公平模式的实现static final class FairSync extends Sync {    FairSync(int permits) {        super(permits);    }    //尝试获取锁    protected int tryAcquireShared(int acquires) {        for (;;) {    //自旋            //同步队列中是否存在排在当前线程前面的线程结点            //也就是说当前线程是不是等待最久的线程,不是的话,直接不让获取锁            if (hasQueuedPredecessors())                    return -1;            int available = getState();    //获取同步状态            int remaining = available - acquires;    //将要更新的同步状态值            //remaining小于0,说明没有可用的资源了,返回获取失败            //remaining大于等于0且CAS方式更新状态失败,则表示有可用资源,继续循环尝试更新直到成功            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }}//公平锁与非公平锁父类abstract static class Sync extends AbstractQueuedSynchronizer {    Sync(int permits) {        setState(permits);    }    //获取剩余许可资源数    final int getPermits() {        return getState();    }    //非公平尝试获取锁,与公平模式基本相似,只是不需要判断当前线程是不是等待最久的线程    final int nonfairTryAcquireShared(int acquires) {        for (;;) {            int available = getState();            int remaining = available - acquires;            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }    //释放锁资源    protected final boolean tryReleaseShared(int releases) {        for (;;) {            int current = getState();                int next = current + releases;            if (next < current)     //超出最大许可值,抛异常                    throw new Error("Maximum permit count exceeded");            if (compareAndSetState(current, next))                return true;        }    }    //减少许可数,即假设当前许可数是5,调用该方法会时许可数变为5-reductions    final void reducePermits(int reductions) {        for (;;) {            int current = getState();            int next = current - reductions;            if (next > current) // underflow                throw new Error("Permit count underflow");            if (compareAndSetState(current, next))                return;        }    }    //获取可立即使用的许可数    final int drainPermits() {        for (;;) {            int current = getState();            if (current == 0 || compareAndSetState(current, 0))                return current;        }    }}

    了解了底层实现,再看看Semaphore获取许可和释放许可的过程:

//获取一个许可,在提供一个许可前一直将线程阻塞,或线程被中断public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}//AQS中的方法,可被中断的获取锁的方法public final void acquireSharedInterruptibly(int arg)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    //tryAcquireShared调用的是公平锁FairSync或NonfairSync中的重写方法    //tryAcquireShared可能失败,进入doAcquireSharedInterruptibly方法进行自旋或加入同步队列    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}//尝试获取一个许可,在有可用的许可前将其阻塞,且不可被中断public void acquireUninterruptibly() {    sync.acquireShared(1);}//AQS中的方法public final void acquireShared(int arg) {    //尝试获取许可失败的话,会进行自旋或加入到同步队列中等待获取锁    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);    //AQS中已分析过,不再深入}//尝试获取一个许可public boolean tryAcquire() {    return sync.nonfairTryAcquireShared(1) >= 0;    //只进行一次方式,失败后直接返回+}//在一定时间内尝试获取许可public boolean tryAcquire(long timeout, TimeUnit unit)    throws InterruptedException {    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}//AQS中的在一定时间内尝试获取锁的方法public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)        throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    //本质上还是先调用tryAcquireShared方法,不成功再在一定时间去自旋或加入同步队列等待获取许可    return tryAcquireShared(arg) >= 0 ||        doAcquireSharedNanos(arg, nanosTimeout);}//获取多个许可,在提供permits许可前一直将线程阻塞,或线程被中断public void acquire(int permits) throws InterruptedException {    if (permits < 0) throw new IllegalArgumentException();    sync.acquireSharedInterruptibly(permits);}//获取给定数目的许可,在提供这些许可前一直将线程阻塞,不可被中断public void acquireUninterruptibly(int permits) {    if (permits < 0) throw new IllegalArgumentException();    sync.acquireShared(permits);}//尝试获取给定数目的许可,值尝试一次,失败直接返回public boolean tryAcquire(int permits) {    if (permits < 0) throw new IllegalArgumentException();    return sync.nonfairTryAcquireShared(permits) >= 0;}//在给定的时间内尝试获取给定数目的许可数,超时返回失败public boolean tryAcquire(int permits, long timeout, TimeUnit unit)    throws InterruptedException {    if (permits < 0) throw new IllegalArgumentException();    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}//释放许可的过程//释放一个许可public void release() {    sync.releaseShared(1);}//AQS中的方法public final boolean releaseShared(int arg) {    //尝试释放一个许可,不成功则进入doReleaseShared继续尝试释放许可    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}//释放给定数目的许可public void release(int permits) {    if (permits < 0) throw new IllegalArgumentException();    sync.releaseShared(permits);}

 

转载于:https://my.oschina.net/bzhangpoorman/blog/3034371

你可能感兴趣的文章
P2440 木材加工(二分+贪心)
查看>>
hadoop学习笔记(二)
查看>>
amoeba安装与实现amoeba for mysql读写分离
查看>>
结对编程--四则运算改进版(107,120)
查看>>
点击效果,
查看>>
CSS水平居中的三种方法
查看>>
刚发现的取色工具
查看>>
python全局解释器锁-----GIL
查看>>
linux 安装 qt
查看>>
Webpack 入门
查看>>
学习笔记-------ultraedit
查看>>
在T-SQL语句中访问远程数据库(openrowset/opendatasource/openquery)
查看>>
第一个Ionic应用
查看>>
codeforces730I Olympiad in Programming and Sports(姿势题 优先队列?dp?)
查看>>
POJ 3260 The Fewest Coin
查看>>
201421410018 于佳裔 实验四
查看>>
【VUE】@click加上v-bind绑定切换类名及动画事件
查看>>
Microsoft发布新一代主机:Xbox One
查看>>
运维经验分享:关于系统运维监控的几点建议
查看>>
jQuery渐隐渐现字体发虚的问题
查看>>