Java多线程(二)——ReentrantLock源码分析-创新互联
ReentrantLock 是可重入的互斥锁,虽然具有与 Synchronized 相同的功能,但比 Synchronized 更加灵活。 ReentrantLock 底层基于 AQS(AbstractQueuedSynchronizer)实现。
创新互联服务项目包括耀州网站建设、耀州网站制作、耀州网页制作以及耀州网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,耀州网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到耀州省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!Reentrant 实现了 Lock 接口,其是 Java 对锁操作行为的统一规范,Lock接口定义如下:
public interface Lock{//获取锁
void lock();
//获取锁-可以响应中断
void lockInterruptibly() throws InterruptedException;
//尝试获取一次锁
boolean tryLock();
//返回获取锁是否成功状态 - 响应中断
boolean tryLock(long time,TimeUnit unit) throws InterrptedException;
//释放锁
void unlock();
//创建条件变量
Condition newCondition();
}
1. ReentrantLock的使用使用 ReentrantLock 的 lock() 方法进行锁的获取,即上锁。使用 unlock() 方法进行解锁。
public class ReentrantLockDemo1 {public static void main(String[] args) {ReentrantLock lock = new ReentrantLock();
lock.lock();
try{ //临界区代码
}finally {//为避免临界区代码出现异常,导致锁无法释放,必须在finally中加上释放锁的语句
lock.unlock();
}
}
}
ReentrantLock 也是可重入锁:
public class ReentrantLockDemo1 {ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread() {@Override
public void run() {lock.lock();
try {//获取锁之后,在m1中进行锁的重入
m1();
} finally {lock.unlock();
}
}
private void m1() {lock.lock();
try {//临界区
} finally {lock.unlock();
}
}
};
public static void main(String[] args) {ReentrantLockDemo1 demo = new ReentrantLockDemo1();
demo.t1.start();
}
}
默认情况下,通过构造方法new ReentrantLock()
获取的锁为非公平锁。
public class ReentrantLock{...
public ReentrantLock() {sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
...
}
为观察公平与非公平的区别,我们尝试如下程序:
public class ReentrantLockDemo1 {//启用公平锁
ReentrantLock lock = new ReentrantLock(true);
Runnable run = new Runnable() {@Override
public void run() {for (int i = 100; i >0; i--) {lock.lock();
try {System.out.println(Thread.currentThread().getName() + " got the lock");
} finally {lock.unlock();
}
}
}
};
public static void main(String[] args) {ReentrantLockDemo1 demo = new ReentrantLockDemo1();
Thread t1 = new Thread(demo.run,"t1");
Thread t2 = new Thread(demo.run,"t2");
t1.start();
t2.start();
}
}
使用公平锁时,上述程序运行结果:
t1 got the lock
t2 got the lock
t1 got the lock
t2 got the lock
t1 got the lock
t2 got the lock
t1 got the lock
...
- t1获取到锁的时候,t2发现有人持有锁,进入队列中排队
- t1释放锁的时候,唤醒t2,t2程序继续运行
- t1开始acquire()方法,而t2已经在判断自己是否为队列头,并尝试获取锁
- 正常情况t2会比t1更先获取到锁资源
- t1发现t2持有锁,t1进入队列中排队
- 循环上述情况导致运行结果如上。
使用非公平锁时,上述程序运行结果:
...
t1 got the lock
t1 got the lock
t1 got the lock
t1 got the lock
t2 got the lock
t2 got the lock
t2 got the lock
...
- t1获取锁的时候,t2在队列中排队
- t1释放锁的时候,唤醒t2,t2程序继续运行
- t2还在尝试获取前驱,并tryAquire()时,t1已经在casState()了
- 正常情况t1会比t2更快获取到所资源
- t2发现t1有锁,t2进入队列中排队
- 循环上述情况导致运行结果如上。
ReentrantLock 首先调用 lock 方法尝试获取锁资源。
public void lock() {sync.lock();
}
开启公平锁时,sync 对象为 FairSync 实例,开启非公平锁时,sync 对象为 NonFairSync 对象。
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
观察公平与非公平的 lock() 实现方式的不同。我们发现:
公平锁在获取锁的时候,会直接进行 acquire() ,而非公平锁则是直接尝试 CAS 去更新锁资源的 state 变量,更新成功则获取到锁资源,如果获取不到,才会进入 acquire()。这里涉及到 AQS 的 state 与 双向链表数据结构,可以在AQS专题学习。
CAS + volatile 实现线程安全地更新变量CAS(CompareAndSwap):在Java中,使用Unsafe类的compareAndSet()方法可以通过底层的 lock cmpxchg 指令实现原子性操作。
volatile :保证了线程间的变量一致性,即可见性。
CAS + Volatile:多线程场景中,某个个线程通过 CAS 将 volatile 修饰的变量更新成功后,所有线程在使用该变量时,都可见该变量的最新值。从而保证,在多线程场景下,对该变量的修改,不会引起线程安全问题。
static final class FairSync extends Sync {...
final void lock() {//直接进入acquire
acquire(1);
}
}
static final class NonfairSync extends Sync { ...
final void lock() {//先尝试更新AQS的State竞争锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//直接获取锁失败时,才会进入acquire
acquire(1);
}
}
compareAndSetState()
:调用 Unsafe类提供的native层的原子性 CAS 操作。修改 AQS 中的 state 变量。
protected final boolean compareAndSetState(int expect, int update) {return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
setExclusiveOwnerThread()
:在 AQS 中将当前线程设置为锁的持有者
protected final void setExclusiveOwnerThread(Thread thread) {exclusiveOwnerThread = thread;
}
step2: accquire() —— 模板方法FairSync 与 NonFairSync 都是 AQS 的子类,acquire() 是 AQS 向子类提供的模板方法。其中 tryAcquire() 方法需要子类重写实现。
public final void acquire(int arg) {//先根据公平与非公平不同的方式,进行尝试获取锁
if (!tryAcquire(arg) &&
//如果获取失败,则排队等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
step3: tryAquire() 的不同实现tryAquire()方法需要子类重写实现,在 AQS 中,该方法仅抛出一个异常:
protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();
}
先看到公平锁对 tryAquire() 的实现。公平锁的 tryAquire() 主要做了:
- 如果自己持有锁,则进行锁的重入
- 如果锁空闲,先看是否有人排队(非公平会直接CAS获取锁)
- 如果没有人排队,则CAS尝试获取所资源
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();
//查看是否有人持有锁
int c = getState();
if (c == 0) {//如果没有人持有锁
//查看是否有人排队,如果没人排队则尝试CAS获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {//获取锁成功,将AQS持有锁的线程设置为本线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//锁的重入
int nextc = c + acquires;
if (nextc< 0)
throw new Error("Maximum lock count exceeded");
//这里可以直接设置,同一个线程,不会有线程安全。
//state>0表示有人持有锁,state的具体数值表示锁的重入次数
setState(nextc);
return true;
}
return false;
}
而非公平锁则不同,非公平锁的 tryAquire() 主要做了:
- 如果自己持有锁,则进行锁的重入
- 如果锁空闲,直接CAS尝试获取锁(公平锁会先看是否有人排队)
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//如果锁资源空闲,直接CAS尝试获取锁
if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//锁的重入
int nextc = c + acquires;
//重入次数过多,int类型会overflow变成负数
if (nextc< 0) // overflow
throw new Error("Maximum lock count exceeded");
//锁重入,直接设置新的值,不会有线程安全问题
setState(nextc);
return true;
}
return false;
}
step4. 入队并阻塞线程,由 AQS 实现在acquire()模板方法中,如果tryAquire()没有获取到锁,将会准备在 AQS 中排队。主要工作:
- 将当前线程包装在 AQS 的 Node结构 中
- 插入 AQS 的双向队列的队尾
public final void acquire(int arg) {if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {//将要入队的线程封装到 AQS 的 Node结构 中
Node node = new Node(Thread.currentThread(), mode);
//获取队尾元素
Node pred = tail;
//如果队尾元素不为空,则跟在队尾元素之后
if (pred != null) {node.prev = pred;
//通过CAS保证线程安全地入队
if (compareAndSetTail(pred, node)) {pred.next = node;
return node;
}
}
//如果入队失败,通过enq来循环CAS,自旋尝试入队
enq(node);
return node;
}
private Node enq(final Node node) {for (;;) {Node t = tail;
//如果队尾为空,说明队中没有元素,连head都没有
if (t == null) {// Must initialize
//cas 使队头队尾指针指向空Node
//head->Node()<- tail
if (compareAndSetHead(new Node()))
tail = head;
} else {//线程安全地尝试插入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {t.next = node;
return t;
}
}
}
}
通过addWaiter(),不论公平锁还是非公平锁,都将当前线程包装在Node结构中,并插入到 AQS 的双向链表的队列末尾。
而后在 acquireQueued() 中,视情况再次获取锁,或者直接尝试阻塞线程:
- 如果该线程所在的Node在队列中处于队头,可以tryAquire()再次尝试获取锁资源,公平锁与非公平锁都将直接 CAS 争取。(因为即使是公平锁,该线程也处在队头,
hasQueuedPredecessors()
判断为真) - 如果获取失败,将做阻塞前的准备
- 阻塞准备完成后阻塞线程。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;
try {boolean interrupted = false;
for (;;) {//获取前驱节点
final Node p = node.predecessor();
//如果前驱是head(head是空节点),说明当前Node排在队头,尝试获取所资源
if (p == head && tryAcquire(arg)) {//如果获取资源成功,则不阻塞当前线程,而是return回去,继续程序的执行
//同时将包装当前的Node清空,并变为新的head
setHead(node);
//将原来的头清空应用,等待GC回收
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果争锁失败,将会准备阻塞,如果本次准备失败,将会再循环一次到这里,准备成功即可阻塞。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {...
}
}
shouldParkAfterFailedAcquire()
做了阻塞线程前的判断,主要工作是:
- 如果前驱结点是正常的(waitStatus< 0),则当前线程可以阻塞。
- 如果前驱结点的waitStatus==0,说明刚被初始化,还没被使用,CAS尝试将其更新为waitStatus = -1;
- 如果前驱结点的waitStatus>0,则该前驱结点是要被废弃的,更新链表结构,抛弃废弃的前驱结点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//waitStatus = -1
return true;//前驱结点正常,当前线程可以阻塞
if (ws >0) {//waitStatus = CANCELLED = 1
do {//更新前驱节点,将node的前驱引用指向更前一个
//pred = pred.prev;
//node.prev = pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus >0);
//最后将可用的前驱结点指向node自己,从而抛弃中间若干个废弃的节点
pred.next = node;
} else {//如果node的waitStatus<0 但不是-1,只需要都统一更新为-1即可。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
准备工作完成,就可以进入线程阻塞,parkAndCheckInterrupt()方法通过Unsafe类实现线程的阻塞。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);
return Thread.interrupted();
}
//LockSupport:
public static void park(Object blocker) {Thread t = Thread.currentThread();
//设置写屏障 write barrier
setBlocker(t, blocker);
//通过UNSAFE类的park()native方法进行线程的阻塞。
UNSAFE.park(false, 0L);
//设置写屏障 write barrier
setBlocker(t, null);
}
2.2 公平锁与非公平锁的解锁实现不论是公平锁还是非公平锁,解锁的实现是一致的:
- 每次解锁,都对state值减1
- 如果state的值变为了0,说明即使重入的锁,也都完全退出
- 将 AQS 对持有锁线程的引用置为null
- 唤醒等待队列中的某个线程
//ReentrantLock
public void unlock() {sync.release(1);
}
//Sync extends AQS
public final boolean release(int arg) {//解锁
if (tryRelease(arg)) {Node h = head;
//如果解锁完成,如果队列中有元素,则唤醒队列中的某个线程
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//Sync
protected final boolean tryRelease(int releases) {//计算本次解锁后 state 的值
int c = getState() - releases;
//如果要解锁的不是持有锁的线程,说明程序出了问题
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果解锁后的值为 0,说明彻底解锁
if (c == 0) {free = true;
//去掉 AQS 对持有锁线程的引用
setExclusiveOwnerThread(null);
}
//设置新的state值
setState(c);
return free;
}
其中,解锁后需要唤醒队列中的某个线程,主要流程是:
- 如果队列中有元素,就会进入 unparkSuccessor() 进行唤醒
- 将node的waitStatus值设为0,变为初始化状态
- 获取其后继节点
- 如果有后继节点,则唤醒该节点
- 如果没有后继节点,或者后继节点是废弃的(waitStatus=1),从队尾往前循环找到下一个可用的前驱节点,并唤醒它
- 如果全是废弃的,那么什么也不做。
private void unparkSuccessor(Node node) {//获取头结点的waitStatus
int ws = node.waitStatus;
if (ws< 0)
//如果头结点是个被复用的空节点,把它设置为初始化状态,即waitStatus = 0
compareAndSetWaitStatus(node, ws, 0);
//获取头结点的后继节点
Node s = node.next;
//如果没有后继节点,或者后继节点废弃
if (s == null || s.waitStatus >0) {s = null;
//从队尾往前寻找一个可用的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus<= 0)
s = t;
}
//如果最后找到一个可用的节点,那么唤醒其绑定的线程
if (s != null)
LockSupport.unpark(s.thread);
}
至此,ReentrantLock 的加锁与解锁全部分析完成。最后附上非公平锁的加锁时序图:
3.为什么Sync实现nonfairTryAcquire()?因为 tryLock() 没有公平与非公平的概念,都是走非公平的逻辑,调用sync.nonfaireTryAquire(),即:
- 如果锁空闲,则CAS一次,尝试获取锁
- 如果锁非空闲,但可重入,则重入
- 1和2都失败,则return false。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
当前题目:Java多线程(二)——ReentrantLock源码分析-创新互联
当前链接:http://scyanting.com/article/dcsesg.html