加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Java > 正文

Semaphore源码分析-java8

发布时间:2020-12-14 06:39:52 所属栏目:Java 来源:网络整理
导读:1.特性分析 Semaphore就是一个计数的信号量 每一个线程在获取资源前,必须从semaphore获取许可,这保证了一定有可用的资源。 注意:acquire方法并没有使用同步锁机制 ,这样就保证了acquire方法被调用时,被使用完的资源依然可以放回资源池中。 二元semaphore

1.特性分析

  • Semaphore就是一个计数的信号量
  • 每一个线程在获取资源前,必须从semaphore获取许可,这保证了一定有可用的资源。
  • 注意:acquire方法并没有使用同步锁机制,这样就保证了acquire方法被调用时,被使用完的资源依然可以放回资源池中。
  • 二元semaphore
    • 它只有两个状态:1表示可用 或者 0表示可用
    • 和java.util.concurrent.locks.Lock实现不一样的属性:它的锁可以由其它的线程释放,而不是自己去释放
    • 这在一些特定的上下文中很有用,比如死锁的恢复
  • 构造器的一个可选参数:fairness
    • 当其值为false时,此类不能保证线程获取permit的顺序是按照调用acquire的顺序
    • 当其值为true时,semaphore就保证了先调用acquire方法的线程先获得permit(FIFO思想)
  • 公平和非公平的适用场景:
    • 用作控制资源访问时,应该被初始化为公平的,从而保证不会出现线程饥饿现象
    • 作为其它同步控制时,非公平的顺序要比公平顺序更加高效
  • semaphore的同步实现:所有的机制都要通过AQS这一子类实现.
  • 一个线程调用release()方法前是否调用了acquire()方法并没有强制要求 ,这一点和操作系统里面的信号量不一致。

  • private final Sync sync; 所有的机制都要通过AQS这一子类实现,所以此类中很多方法的实现都代理到了AQS中去执行
  • abstract static class Sync extends AbstractQueuedSynchronizer semaphore的同步实现,使用了AQS状态代表了permits,子类分公平和非公平两个版本。
  • 2个构造函数
    • public Semaphore(int permits)
    • public Semaphore(int permits,boolean fair)
  • static final class NonfairSync extends Sync Sync的非公平版本
  • static final class FairSync extends Sync Sync的公平版本

    package sourcecode.analysis;
/**
 * @Author: cxh
 * @CreateTime: 18/4/15 11:10
 * @ProjectName: JavaBaseTest
 */

import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * Semaphore就是一个计数的信号量.从概念上来讲,一个semaphore维持了一个permit集合.如果有必要每一个acquire方法都会加锁
 * 直到有一个可用的permit,然后获取这个permit.release方法每执行一次就添加一个permit,通常同时释放一个调用acquire方法
 * 且被阻塞的线程.
 * 然而,并不是真的有permit对象;Semaphore只是保存了一个可用的资源数量值,根据这个数值执行相应的操作.
 *
 * <p>Semaphores are often used to restrict the number of threads than can
 * access some (physical or logical) resource. For example,here is
 * a class that uses a semaphore to control access to a pool of items:
 *
 * Semaphores通常被用作对某些物理or逻辑资源访问的线程数量有限制的场景.比如:有一个类使用了一个Semaphore来控制线程对资源池的访问.
 * class Pool {
 *   private static final int MAX_AVAILABLE = 100;//最多线程数
 *   private final Semaphore available = new Semaphore(MAX_AVAILABLE,true);
 *
 *   public Object getItem() throws InterruptedException {
 *     available.acquire();//减少许可
 *     return getNextAvailableItem();//获取要访问的资源
 *   }
 *
 *   public void putItem(Object x) {
 *     if (markAsUnused(x))
 *       available.release();//增加许可
 *   }
 *
 *   // Not a particularly efficient data structure; just for demo
 *
 *   protected Object[] items = ... whatever kinds of items being managed //被访问资源
 *   protected boolean[] used = new boolean[MAX_AVAILABLE]; //许可控制
 *
 *   protected synchronized Object getNextAvailableItem() {
 *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
 *       if (!used[i]) { //如果当前items[i]资源可用
 *          used[i] = true; //获取当前资源,且置当前资源items[i]不可用
 *          return items[i];
 *       }
 *     }
 *     return null; // not reached  //无可用资源
 *   }
 *
 *  //资源被用完后,进行释放
 *   protected synchronized boolean markAsUnused(Object item) {
 *     for (int i = 0; i < MAX_AVAILABLE; ++i) {
 *       if (item == items[i]) {
 *          if (used[i]) {
 *            used[i] = false;
 *            return true;
 *          } else
 *            return false;
 *       }
 *     }
 *     return false;
 *   }
 * }}</pre>
 *
 * 每一个线程在获取资源前,必须从semaphore获取许可,这保证了一定有可用的资源.
 * 当一个资源被使用完后,它会被放回资源池,且semaphore的许可也会增加一个,从而使得下一个线程可以使用它.
 * 注意:acquire方法并没有使用同步锁机制,这样就保证了acquire方法被调用时,被使用完的资源依然可以放回资源池中.
 * semaphore将互斥访问资源池的行为进行了封装,将那些需要通过同步机制维持资源池一致性的行为进行了分离.
 *
 * semaphore初始化值为1,这被用作只有一个permit的场景,这可以用作一个互斥锁.
 * 众所周知的就是:二元semaphore,因为它只有两个状态:1表示可用 或者 0表示可用.当使用这种方式时,二元semaphore就有一些
 * 和java.util.concurrent.locks.Lock实现不一样的属性,比如:它的锁可以由其它的线程释放,而不是自己去释放(因为semaphore没有
 * 所有权的概念).这在一些特定的上下文中很有用,比如死锁的恢复.
 *
 * 这个类的构造器参数有一个可选参数:fairness.
 * 1.当其值为false时,此类不能保证线程获取permit的顺序是按照调用acquire的顺序.
 * 尤其,干涉是被允许的,也就是说,后调用acquire方法的线程可以先获得permit-逻辑上就是新来的线程会把自己放在等待线程的头部.
 * 2.当其值为true时,semaphore就保证了先调用acquire方法的线程先获得permit(FIFO思想).
 * 注意:无时间限制的tryAcquire()方法不支持fairness值的设定,只要有可用的permit,它就会获取.
 *
 * 一般来说,semaphores被用作控制资源访问时应该被初始化为公平的,从而保证不会出现线程饥饿现象.
 * 当把semaphores作为其它同步控制时,非公平的顺序要比公平顺序更加高效.
 *
 * 此类在同一时刻acquire和release多个资源上也提供了方便.
 * 当fairness值为false时,一定要小心线程操作被无限期推迟带来的风险.
 *
 * 内存一致性:一个线程中在调用release()方法前的行为 优先于 另一个线程acquire()方法成功获取资源的行为;
 *
 * 此类可序列化
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;

    //所有的机制都要通过AQS这一子类实现.
    private final Sync sync;

    //semaphore的同步实现.使用了AQS状态代表了permits.子类分公平和非公平两个版本.
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        //设定同步状态的值.调用了父类AQS的方法
        Sync(int permits) {
            setState(permits);
        }

        //返回同步状态的当前值.调用了父类AQS的方法
        final int getPermits() {
            return getState();
        }

        //CAS设定同步状态值,调用了父类AQS的方法,父类中使用了sun.misc.Unsafe这一final类
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available,remaining))
                    return remaining;
            }
        }

        //释放许可,CAS算法
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current,next))
                    return true;
            }
        }

        //获取许可,CAS算法
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current,next))
                    return;
            }
        }

        //CAS等待permit为0,一旦出现0就立即返回0.
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current,0))
                    return current;
            }
        }
    }

    //Sync的非公平版本
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    //Sync的公平版本
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available,remaining))
                    return remaining;
            }
        }
    }

    //创建Semaphore的非公平实例
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    //创建Semaphore的公平实例
    public Semaphore(int permits,boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 从semaphore获取一个permit,一个线程在获得一个permit前or出现中断异常前,始终处于block状态.
     * 进入共享模式,如果出现中断,则异常终止.
     */
    public void acquire() throws InterruptedException {
        //调用父类AQS中方法
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 从semaphore获取一个permit,一个线程在获得一个permit前,始终处于block状态.
     * 这是一个忽略中断的方法.这一点和上面方法是不同的.
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }//调用父类方法

    /**
     * 如果在此方法调用时刚好有可用的permit,则当前线程会从semaphore获取一个permit.
     *
     * 获取permit时,* 1.如果有可用的permit,则会立即返回true,且将permist数量减1.
     * 2.如果没有可用的permit,这一方法会立即返回false.
     *
     * 即使semaphore使用的是公平机制,但是在此方法被调用时,即使有等待线程,如果突然有了一个可用的permit,则
     * 当前线程也会立即获得这个permit.这一行为在一些特定的场景中很有用,尽管它破坏了公平性.如果你想使用公平机制,则
     * 调用tryAcquire(long,TimeUnit),tryAcquire(0,TimeUnit.SECONDS)这两个方法.这两个方法基本是等价的.
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 如果当前线程在等待时间内没有发生中断,且出现一个可用的permit,则从semaphore获取一个permit,* <p>Acquires a permit,if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.
     *
     * 如果调用此方法时,没有可用的permit,则当前线程进入等待状态,直到以下3个事件其中一个发生为止:
     * 1.其它线程调用了release()方法,且当前线程是下一个可以获取permit的线程.
     * 2.其它线程中断了当前线程.
     * 3.等待超时.
     *
     * 如果获取了permit,则返回值为true.
     *
     * 如果当前线程:
     * 1.在进入此方法时,其中断状态被设定;
     * 2.在等待permit的过程中被其它线程中断;
     * 则会抛出InterruptedException,且当前线程的中断状态被清除.
     */
    public boolean tryAcquire(long timeout,TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(timeout));//调用父类AQS方法
    }

    public void release() {
        sync.releaseShared(1);
    }//调用父类AQS方法

    /**
     * 如果调用此方法时,没有足够的permit,直到以下2个事件其中一个发生为止:
     * 1.其它线程调用了release()方法,且当前线程是下一个可以获取permit的线程.
     * 2.其它线程中断了当前线程.
     *
     * 如果当前线程:
     * 1.在进入此方法时,且当前线程的中断状态被清除.
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);//调用父类AQS的方法
    }

    /**
     * 与上面方法不同之处:忽略中断
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);//调用父类AQS的方法
    }

    /**
     * 此方法调用时,如果permit足够,则立即分配并返回true;
     * 否则,直接返回false.
     *
     * 即使semaphore使用的是公平机制,尽管它破坏了公平性.如果想保存公平性,* 则使用tryAcquire(int,long,TimeUnit) 和tryAcquire(permits,TimeUnit.SECONDS)方法即可.
     * 这两个方法是等价的.
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 如果调用此方法时,且当前线程是下一个可以获取permit的线程.
     * 2.其它线程中断了当前线程.
     * 3.超时
     *
     * 如果当前线程:
     * 1.在进入此方法时,且当前线程的中断状态被清除.
     */
    public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits,unit.toNanos(timeout));//调用父类AQS的方法
    }

    /**
     * 一个线程调用release()方法前是否调用了acquire()方法并没有强制要求.
     * semaphore的正确使用是建立在是否方便编程上.(所有的release相关的方法都有这个特性)
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);//调用父类AQS的方法
    }

    //返回同步状态的当前值
    public int availablePermits() {
        return sync.getPermits();
    }

    //获取并返回当前可用的permit数量
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 减少permit的数量.
     * 这一方法在使用semaphore跟踪不可用资源的子类中很有用.
     * 这一方法和acquire()的不同之处在于:即使当前permit不够,也不会阻塞线程.
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    //判定当前semaphore是否为公平的
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * 查询是否有因为acquire方法的调用而等待的线程.
     * 注意:因为可用随时取消等待,所有即使返回true,也并不代表之前并未有任何线程因为acquire而发生阻塞.
     * 这一方法经常被用作系统状态的监控.
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * 返回值只是一个估计值,因为在此方法遍历内部数据结构时,线程数据可以动态改变.
     * 这一方法被用作:系统状态的监控,但不会用作同步控制.
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * 返回阻塞的线程集合.
     * 因为此方法在统计时,线程集合会发生改变,所以返回结果只是一个尽最大努力的估计值.
     * 这一方法被用作:帮助子类的构建,它提供了更具有扩展性的监控机制.
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    //返回semaphore的状态值.只不过以字符串方法进行表示而以.
    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

4.使用举例

public static void main(String[] args) {
    //test
    for(int i=0;i<5;i++){
        Put p=new Put(new Get());
        p.start();
    }
}
//假设妈妈向盘子放苹果,小花从盘子取苹果.盘子的最大容量为:只能放下一个苹果.
static class Get {
    //单线程工作
    private Semaphore se = new Semaphore(1);
public void doGet() {
    try {
        se.acquire();//获取盘子的所有权,获取信号量
        System.out.println(Thread.currentThread().getName() + "获取信号量," + " and  time is:" + System.currentTimeMillis());
        Thread.sleep(200);
        System.out.println(Thread.currentThread().getName() + "释放信号量," + " and  time is:" + System.currentTimeMillis());
        se.release();//释放盘子的所有权,释放信号量
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}

static class Put extends Thread {
@Override
public void run() {
try {
put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private Get get;

Put(Get get) {
    this.get = get;
}

//妈妈向盘子放苹果
public void put() throws InterruptedException {
    get.doGet();//通知小花可以拿苹果吃啦
}

}

输出结果如下:

Thread-0获取信号量,and  time is:1525393809877
Thread-1获取信号量,and  time is:1525393809878
Thread-2获取信号量,and  time is:1525393809878
Thread-3获取信号量,and  time is:1525393809878
Thread-4获取信号量,and  time is:1525393809878
Thread-1释放信号量,and  time is:1525393810079
Thread-2释放信号量,and  time is:1525393810079
Thread-0释放信号量,and  time is:1525393810079
Thread-3释放信号量,and  time is:1525393810079
Thread-4释放信号量,and  time is:1525393810082

Process finished with exit code 0

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读