锁专题(9) Semaphore 信号量源码深度解析
情景导入
我想各位小伙伴一定都做过导出功能,就算没做过,那肯定也用过。
如果你既没有吃过猪肉,也没有见过猪跑。那这篇文章也可以读一读,可以补充点知识。
导出作为一个非常常见的功能,也是稍有不慎就会导致系统压力剧增的问题之一。
有类似苦恼的小伙伴可以阅读下我以前写的文章:
cpu 又报警了,快看看为什么?
记得前不久,一个本该和平的上午,报警群里忽然就炸了,报警信息不断地轰炸过来。
“快看看怎么了?”,项目经理赶紧让各位同事查查问题。
“CPU 太高了”,查了下,有人回到,“不知道是谁在一直导出大文件。”
我们知道 excel 的导出是非常消耗较器性能的,为了限制范围,以前已经加了时间范围 1 个月,还做了很多优化。
但是现在操作员可能在同时导出,导致机器压力还是太大了。
“看一下能不能解决这个问题”,项目经理说,“这么报警也不是办法。”
怎么解决?
各位小伙伴,如果是你来解决这个问题,你会怎么做呢?
我们今天来介绍一种比较常用的解决方案,需要用到今天的主角 Semaphore 信号量。
小伙伴有其他想法也欢迎评论区和大家分享讨论一下。
Semaphore 介绍
计数信号量(Counting Semaphore)用来控制同时访问的某个特定资源的操作数量,或者同时执行某个指定操作的数量。
ps: 我们可以根据这个特性,灵活地限制同时导出的执行数量。
计算信号量还可以用来实现某种资源池,或者对容器施加边界。
Semaphore 中管理着一组虚拟许可(permit),许可的初始量可通过构造函数来指定。
在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。
如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。
release方法将返回一个许可给信号量。计算信号量的一种简化形式是二值信号量,即初始化值为1的Semaphore。二值信号量可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。
使用例子
public class TestSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index current) // underflow
throw new Error("Permit count underflow");
// 通过 CAS 设置
if (compareAndSetState(current, next))
return;
}
}
// 这个就是将 state 设置为 0
// 功能是将剩余的授权都清空。
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
非公平锁实现
非公平锁的 tryAcquireShared 就是直接调用 Sync 中的方法。
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
公平锁实现
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining = 0;
}
可以根据我们指定是否公平尝试获取锁:
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
这个方法对应的就是我们指定的 NoFairSync 或者 FairSync 对象了。
指定获取信号量的个数
默认是获取 1 个信号量,我们也可以指定获取的个数。
public void acquire(int permits) throws InterruptedException {
if (permits = 0;
}
当然也有同时指定信号量个数+超时时间的方法:
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits This method is typically used for debugging and testing purposes.
*
* @return the number of permits available in this semaphore 剩余的可用信号量
* @author 老马啸西风
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* Acquires and returns all permits that are immediately available.
*
* @return the number of permits acquired 这个会把所有可用的信号量都用掉。有点一掷千金的味道。
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* Shrinks the number of available permits by the indicated
* reduction. This method can be useful in subclasses that use
* semaphores to track resources that become unavailable. This
* method differs from {@code acquire} in that it does not block
* waiting for permits to become available.
*
* @param reduction the number of permits to remove 减少对应的可用数量。不过这个方法是一个 protected,
* @throws IllegalArgumentException if {@code reduction} is negative
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
小结
Semaphore 作为一个并发的控制工具,使用起来非常的方便,实现的原理非常类似可重入锁,都是继承自 AQS 类。
希望本文对你有帮助,如果有其他想法的话,也可以评论区和大家分享哦。
各位极客的点赞收藏转发,是老马持续写作的最大动力!
参考资料
jdk 源码