Java并发包之ReentrantLock、AbstractQueuedSynchronizer

ReentrantLock介绍

ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该锁再次获取不会被阻塞。
synchronized作用相同(隐式支持重入性)
**synchronized实现:**JVM中对对象头的操作。获取自增、释放自减方式实现重入。monitorenter monitorexit

重入性实现原理

1
2
3
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {

可知内部有个Sync对象,继承自AQS。lock方法和unlock方法都是代理的Sync对象。默认构造函数是Sync非公平锁。要理解可重入锁,则就是理解这几个Sync的实现。

可重入如何解决:

  1. 在线程获取锁的时候,如果已经获取锁的线程是当前线程的话则直接再次获取成功
  2. 由于锁会被获取n次,那么只有锁在被释放同样的n次之后,该锁才算是完全释放成功

Sync抽象类

继承AQS,重写了nonfairTryAcquire方法,分析可知其流程很简单,利用state字段记录获取的锁数量。当无锁时直接尝试获取,注意只尝试一次没有自旋CAS。当有锁时检查是否当前线程获取的锁,是直接则锁数量+1,没有用到CAS。
获取锁的过程【nonfairTryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean nonfairTryAcquire(int acquires) {
//1. 如果该锁未被任何线程占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} //2.若被占有,检查占有线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 3. 再次获取,计数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

释放也是比较简单,先判断是否是当前获取的锁,再直接锁的数量减少,若减少到0则标记当前获取线程为null。注意只有获取锁的时候才用CAS,其他的情况包括:其他线程获取锁,同一线程获取的情况是单线程直接set即可!
释放锁的过程:【tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
//1. 同步状态减1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//2. 只有当同步状态为0时,锁成功被释放,返回true
free = true;
setExclusiveOwnerThread(null);
}
// 3. 锁未被完全释放,返回false
setState(c);
return free;

重入锁的释放必须得等到同步状态为0时锁才算成功释放,否则锁仍未释放。

NonfairSync实现类

继承之Sync抽象类,只重写了lock方法和tryAcquire方法。
同理,无锁时尝试获取,有锁时请求获取一把,这里会进入到AQS中,AQS会把当前线程加入同步队列并调用LockSupport.park挂起当前线程

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

释放时则是调用了AQS的release,其又执行了Sync抽象类的tryRelease方法,释放完所有锁时会unpark等待队列第一个线程。

FairSync实现类

唯一的区别就是在尝试获取锁的时候,会判断同步队列当前是否有节点在等待、当前节点是否有前驱节点,若有则表示有线程比当前线程更早地请求获取锁,则拒绝本次获取锁的请求,加入到同步队列,并挂起线程。符合FIFO。

这段代码的逻辑与nonfairTryAcquire基本上一致,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁。

公平锁和非公平锁利弊

  1. 公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象
  2. 公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是被很多类用于实现同步(CountDownLatch、CycliBarrier、ReentrantLock、ReentrantReadWriteLock),ReentrantReadWriteLock 可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

Lock接口

concurrent包的关键接口,提供了和synchronized相同的功能,不过要显式加解锁,但需要在finally块内保证解锁。

AQS(AbstractQueuedSynchronizer)同步器

用于构建锁和同步器的框架,很多juc中都用其来构建同步器。(ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask)

核心思想

底层:CAS,在C++内使用汇编CPU指令

设计模式:模板方法

一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新调用被子类所重写的方法

1
2
3
4
protected final boolean tryAcquire(int acquires) {
// throw new UnsupportedOperationException(); // 父类实现
return nonfairTryAcquire(acquires); // NonfairSync子类实现
}

AQS中的模板方法acquire又调用到了子类重写的方法

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

总结

  1. 同步组件(这里不仅仅是可重入锁,还包括CountDownLatch等)的实现依赖于同步器AQS,在同步组件实现中,使用AQS的方式被推荐定义继承AQS的静态内存类;
  2. AQS采用模板方法进行设计,AQS的protected修饰的方法需要由继承AQS的子类进行重写实现,当调用AQS的子类的方法时就会调用被重写的方法;
  3. AQS负责同步状态的管理,线程的排队,等待和唤醒这些底层操作,而Lock等同步组件主要专注于实现同步语义;
  4. 在重写AQS的方式时,使用AQS提供的getState(),setState(),compareAndSetState()方法进行修改同步状态
Author: whllhw
Link: https://whllhw.ml/posts/2019/05/09/Java并发包之ReentrantLock、AbstractQueuedSynchronizer/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.