1.特性分析
- Semaphore就是一个计数的信号量
- 每一个线程在获取资源前,必须从semaphore获取许可,这保证了一定有可用的资源。
-
注意:acquire方法并没有使用同步锁机制 ,这样就保证了acquire方法被调用时,被使用完的资源依然可以放回资源池中。
- 二元semaphore
- 它只有两个状态:1表示可用 或者 0表示可用
- 和java.util.concurrent.locks.Lock实现不一样的属性:
它的锁可以由其它的线程释放,而不是自己去释放
- 这在一些特定的上下文中很有用,比如
死锁的恢复 。
- 构造器的一个可选参数:fairness
- 当其值为false时,此类不能保证线程获取permit的顺序是按照调用acquire的顺序
- 当其值为true时,semaphore就保证了先调用acquire方法的线程先获得permit(FIFO思想)
- 公平和非公平的适用场景:
- 用作控制资源访问时,应该被初始化为公平的,从而保证不会出现线程饥饿现象
- 作为其它同步控制时,非公平的顺序要比公平顺序更加高效
- semaphore的同步实现:
所有的机制都要通过AQS这一子类实现 .
-
一个线程调用release()方法前是否调用了acquire()方法并没有强制要求 ,这一点和操作系统里面的信号量不一致。
- private final Sync sync;
所有的机制都要通过AQS这一子类实现,
所以此类中很多方法的实现都代理到了AQS中去执行 。
- abstract static class Sync extends AbstractQueuedSynchronizer
semaphore的同步实现,使用了AQS状态代表了permits,子类分公平和非公平两个版本。
- 2个构造函数
- public Semaphore(int permits)
- public Semaphore(int permits,boolean fair)
- static final class NonfairSync extends Sync
Sync的非公平版本
- static final class FairSync extends Sync
Sync的公平版本
package sourcecode.analysis;
/**
* @Author: cxh
* @CreateTime: 18/4/15 11:10
* @ProjectName: JavaBaseTest
*/
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* Semaphore就是一个计数的信号量.从概念上来讲,一个semaphore维持了一个permit集合.如果有必要每一个acquire方法都会加锁
* 直到有一个可用的permit,然后获取这个permit.release方法每执行一次就添加一个permit,通常同时释放一个调用acquire方法
* 且被阻塞的线程.
* 然而,并不是真的有permit对象;Semaphore只是保存了一个可用的资源数量值,根据这个数值执行相应的操作.
*
* <p>Semaphores are often used to restrict the number of threads than can
* access some (physical or logical) resource. For example,here is
* a class that uses a semaphore to control access to a pool of items:
*
* Semaphores通常被用作对某些物理or逻辑资源访问的线程数量有限制的场景.比如:有一个类使用了一个Semaphore来控制线程对资源池的访问.
* class Pool {
* private static final int MAX_AVAILABLE = 100;//最多线程数
* private final Semaphore available = new Semaphore(MAX_AVAILABLE,true);
*
* public Object getItem() throws InterruptedException {
* available.acquire();//减少许可
* return getNextAvailableItem();//获取要访问的资源
* }
*
* public void putItem(Object x) {
* if (markAsUnused(x))
* available.release();//增加许可
* }
*
* // Not a particularly efficient data structure; just for demo
*
* protected Object[] items = ... whatever kinds of items being managed //被访问资源
* protected boolean[] used = new boolean[MAX_AVAILABLE]; //许可控制
*
* protected synchronized Object getNextAvailableItem() {
* for (int i = 0; i < MAX_AVAILABLE; ++i) {
* if (!used[i]) { //如果当前items[i]资源可用
* used[i] = true; //获取当前资源,且置当前资源items[i]不可用
* return items[i];
* }
* }
* return null; // not reached //无可用资源
* }
*
* //资源被用完后,进行释放
* protected synchronized boolean markAsUnused(Object item) {
* for (int i = 0; i < MAX_AVAILABLE; ++i) {
* if (item == items[i]) {
* if (used[i]) {
* used[i] = false;
* return true;
* } else
* return false;
* }
* }
* return false;
* }
* }}</pre>
*
* 每一个线程在获取资源前,必须从semaphore获取许可,这保证了一定有可用的资源.
* 当一个资源被使用完后,它会被放回资源池,且semaphore的许可也会增加一个,从而使得下一个线程可以使用它.
* 注意:acquire方法并没有使用同步锁机制,这样就保证了acquire方法被调用时,被使用完的资源依然可以放回资源池中.
* semaphore将互斥访问资源池的行为进行了封装,将那些需要通过同步机制维持资源池一致性的行为进行了分离.
*
* semaphore初始化值为1,这被用作只有一个permit的场景,这可以用作一个互斥锁.
* 众所周知的就是:二元semaphore,因为它只有两个状态:1表示可用 或者 0表示可用.当使用这种方式时,二元semaphore就有一些
* 和java.util.concurrent.locks.Lock实现不一样的属性,比如:它的锁可以由其它的线程释放,而不是自己去释放(因为semaphore没有
* 所有权的概念).这在一些特定的上下文中很有用,比如死锁的恢复.
*
* 这个类的构造器参数有一个可选参数:fairness.
* 1.当其值为false时,此类不能保证线程获取permit的顺序是按照调用acquire的顺序.
* 尤其,干涉是被允许的,也就是说,后调用acquire方法的线程可以先获得permit-逻辑上就是新来的线程会把自己放在等待线程的头部.
* 2.当其值为true时,semaphore就保证了先调用acquire方法的线程先获得permit(FIFO思想).
* 注意:无时间限制的tryAcquire()方法不支持fairness值的设定,只要有可用的permit,它就会获取.
*
* 一般来说,semaphores被用作控制资源访问时应该被初始化为公平的,从而保证不会出现线程饥饿现象.
* 当把semaphores作为其它同步控制时,非公平的顺序要比公平顺序更加高效.
*
* 此类在同一时刻acquire和release多个资源上也提供了方便.
* 当fairness值为false时,一定要小心线程操作被无限期推迟带来的风险.
*
* 内存一致性:一个线程中在调用release()方法前的行为 优先于 另一个线程acquire()方法成功获取资源的行为;
*
* 此类可序列化
*
* @since 1.5
* @author Doug Lea
*/
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
//所有的机制都要通过AQS这一子类实现.
private final Sync sync;
//semaphore的同步实现.使用了AQS状态代表了permits.子类分公平和非公平两个版本.
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//设定同步状态的值.调用了父类AQS的方法
Sync(int permits) {
setState(permits);
}
//返回同步状态的当前值.调用了父类AQS的方法
final int getPermits() {
return getState();
}
//CAS设定同步状态值,调用了父类AQS的方法,父类中使用了sun.misc.Unsafe这一final类
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available,remaining))
return remaining;
}
}
//释放许可,CAS算法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current,next))
return true;
}
}
//获取许可,CAS算法
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current,next))
return;
}
}
//CAS等待permit为0,一旦出现0就立即返回0.
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current,0))
return current;
}
}
}
//Sync的非公平版本
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//Sync的公平版本
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available,remaining))
return remaining;
}
}
}
//创建Semaphore的非公平实例
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//创建Semaphore的公平实例
public Semaphore(int permits,boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 从semaphore获取一个permit,一个线程在获得一个permit前or出现中断异常前,始终处于block状态.
* 进入共享模式,如果出现中断,则异常终止.
*/
public void acquire() throws InterruptedException {
//调用父类AQS中方法
sync.acquireSharedInterruptibly(1);
}
/**
* 从semaphore获取一个permit,一个线程在获得一个permit前,始终处于block状态.
* 这是一个忽略中断的方法.这一点和上面方法是不同的.
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}//调用父类方法
/**
* 如果在此方法调用时刚好有可用的permit,则当前线程会从semaphore获取一个permit.
*
* 获取permit时,* 1.如果有可用的permit,则会立即返回true,且将permist数量减1.
* 2.如果没有可用的permit,这一方法会立即返回false.
*
* 即使semaphore使用的是公平机制,但是在此方法被调用时,即使有等待线程,如果突然有了一个可用的permit,则
* 当前线程也会立即获得这个permit.这一行为在一些特定的场景中很有用,尽管它破坏了公平性.如果你想使用公平机制,则
* 调用tryAcquire(long,TimeUnit),tryAcquire(0,TimeUnit.SECONDS)这两个方法.这两个方法基本是等价的.
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 如果当前线程在等待时间内没有发生中断,且出现一个可用的permit,则从semaphore获取一个permit,* <p>Acquires a permit,if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.
*
* 如果调用此方法时,没有可用的permit,则当前线程进入等待状态,直到以下3个事件其中一个发生为止:
* 1.其它线程调用了release()方法,且当前线程是下一个可以获取permit的线程.
* 2.其它线程中断了当前线程.
* 3.等待超时.
*
* 如果获取了permit,则返回值为true.
*
* 如果当前线程:
* 1.在进入此方法时,其中断状态被设定;
* 2.在等待permit的过程中被其它线程中断;
* 则会抛出InterruptedException,且当前线程的中断状态被清除.
*/
public boolean tryAcquire(long timeout,TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1,unit.toNanos(timeout));//调用父类AQS方法
}
public void release() {
sync.releaseShared(1);
}//调用父类AQS方法
/**
* 如果调用此方法时,没有足够的permit,直到以下2个事件其中一个发生为止:
* 1.其它线程调用了release()方法,且当前线程是下一个可以获取permit的线程.
* 2.其它线程中断了当前线程.
*
* 如果当前线程:
* 1.在进入此方法时,且当前线程的中断状态被清除.
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);//调用父类AQS的方法
}
/**
* 与上面方法不同之处:忽略中断
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);//调用父类AQS的方法
}
/**
* 此方法调用时,如果permit足够,则立即分配并返回true;
* 否则,直接返回false.
*
* 即使semaphore使用的是公平机制,尽管它破坏了公平性.如果想保存公平性,* 则使用tryAcquire(int,long,TimeUnit) 和tryAcquire(permits,TimeUnit.SECONDS)方法即可.
* 这两个方法是等价的.
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* 如果调用此方法时,且当前线程是下一个可以获取permit的线程.
* 2.其它线程中断了当前线程.
* 3.超时
*
* 如果当前线程:
* 1.在进入此方法时,且当前线程的中断状态被清除.
*/
public boolean tryAcquire(int permits,long timeout,TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits,unit.toNanos(timeout));//调用父类AQS的方法
}
/**
* 一个线程调用release()方法前是否调用了acquire()方法并没有强制要求.
* semaphore的正确使用是建立在是否方便编程上.(所有的release相关的方法都有这个特性)
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);//调用父类AQS的方法
}
//返回同步状态的当前值
public int availablePermits() {
return sync.getPermits();
}
//获取并返回当前可用的permit数量
public int drainPermits() {
return sync.drainPermits();
}
/**
* 减少permit的数量.
* 这一方法在使用semaphore跟踪不可用资源的子类中很有用.
* 这一方法和acquire()的不同之处在于:即使当前permit不够,也不会阻塞线程.
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//判定当前semaphore是否为公平的
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* 查询是否有因为acquire方法的调用而等待的线程.
* 注意:因为可用随时取消等待,所有即使返回true,也并不代表之前并未有任何线程因为acquire而发生阻塞.
* 这一方法经常被用作系统状态的监控.
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 返回值只是一个估计值,因为在此方法遍历内部数据结构时,线程数据可以动态改变.
* 这一方法被用作:系统状态的监控,但不会用作同步控制.
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* 返回阻塞的线程集合.
* 因为此方法在统计时,线程集合会发生改变,所以返回结果只是一个尽最大努力的估计值.
* 这一方法被用作:帮助子类的构建,它提供了更具有扩展性的监控机制.
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//返回semaphore的状态值.只不过以字符串方法进行表示而以.
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
4.使用举例
public static void main(String[] args) {
//test
for(int i=0;i<5;i++){
Put p=new Put(new Get());
p.start();
}
}
//假设妈妈向盘子放苹果,小花从盘子取苹果.盘子的最大容量为:只能放下一个苹果.
static class Get {
//单线程工作
private Semaphore se = new Semaphore(1);
public void doGet() {
try {
se.acquire();//获取盘子的所有权,获取信号量
System.out.println(Thread.currentThread().getName() + "获取信号量," + " and time is:" + System.currentTimeMillis());
Thread.sleep(200);
System.out.println(Thread.currentThread().getName() + "释放信号量," + " and time is:" + System.currentTimeMillis());
se.release();//释放盘子的所有权,释放信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Put extends Thread {
@Override
public void run() {
try {
put();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private Get get;
Put(Get get) {
this.get = get;
}
//妈妈向盘子放苹果
public void put() throws InterruptedException {
get.doGet();//通知小花可以拿苹果吃啦
}
}
输出结果如下:
Thread-0获取信号量,and time is:1525393809877
Thread-1获取信号量,and time is:1525393809878
Thread-2获取信号量,and time is:1525393809878
Thread-3获取信号量,and time is:1525393809878
Thread-4获取信号量,and time is:1525393809878
Thread-1释放信号量,and time is:1525393810079
Thread-2释放信号量,and time is:1525393810079
Thread-0释放信号量,and time is:1525393810079
Thread-3释放信号量,and time is:1525393810079
Thread-4释放信号量,and time is:1525393810082
Process finished with exit code 0
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|