Java中可重入锁ReentrantLock
ReentrantLock是JDK1.5中java util concurrent (JUC)包提供的一种锁实现,和synchronized不同,synchronized是由JVM操控获取锁和释放锁,ReentrantLock是从代码层面实现的同步。
ReentrantLock类的结构图:
AQS
AbstractQueuedSynchronizer简称AQS,是一个用于构建锁和同步容器的框架。JUC中许多类都是基于AQS构建,如:ReentrantLock,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS为实现同步容器提供了支撑。
AQS使用一个FIFO线程等待队列,如果后来的线程没有抢到锁,则进入此队列进行排队,Head节点不和任何线程关联,其余的Node节点则代表一个正在等待的线程,同时维护着等待状态waitStatus。查看AQS源码你会发现,Node中waitStatus、prev节点、next节点全部用volatile关键字修饰,保证线程间的可见性。
waitStatus:表示节点的状态,其中包含的状态有:
- CANCELLED:值为1,表示当前节点被取消;
- SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点;
- CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中;
- PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
- 0:无状态,表示当前节点在队列中等待获取锁。
prev:前继节点;
next:后继节点;
nextWaiter:存储condition队列中的后继节点;
thread:当前线程。
AQS中还有一个private表示状态的字段state,state初始化为0,表示未锁定状态。T线程执行lock(),调用tryAcquire()获取到锁,state+1。其他线程再tryAcquire()时就会失败,直到T线程执行unlock()到state=0(释放锁)为止,其他线程才有机会获取该锁。在释放锁之前,T线程还是可以重复获取到该锁,相应state+1,也就是所说的可重入(获取多少次锁,就要释放多少次锁,保证state置回0)。
NonfairSync(非公平锁)
1. Lock源码
1 2 3 4 5 6 |
final void lock() { if (compareAndSetState(0, 1))//CAS操作 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } |
CAS:即CompareAndSwap,比较并交换。CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。”Java并发包(java.util.concurrent)中大量使用了CAS操作,涉及到并发的地方都调用了sun.misc.Unsafe类方法进行CAS操作。
判断state是否为0,如果是0则置为1,并设置当前线程为独占线程,表示获取锁成功,多个线程同时尝试获取同一把锁时,CAS操作只能保证一个线程操作成功,其余会走acquire方法,进入队列排队。
从代码上我们可以看出其“不公平性”,一旦占用该锁的线程释放,state置为0,而排队的线程还未被唤醒时,新来的线程就直接抢占了该锁,出现了“插队”现象。
2. acquire(arg)
1 2 3 4 5 |
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } |
2.1 tryAcquire(arg)
一旦尝试获取成功,则直接返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取锁状态 int c = getState(); //此方法在父类AQS中 if (c == 0) { //没有线程占用锁 if (compareAndSetState(0, acquires)) { //占用锁成功,设置独占线程为当前线程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) {//判断当前线程是否已经拥有该锁。 int nextc = c + acquires; //如果为当前线程则state+1,此时已经体现了重入。 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //更新状态 setState(nextc); return true; } //该锁已经被其他线程占用,获取锁失败! return false; } |
nonfairTryAcquire执行流程:首先获取锁状态是否有线程占用,如果没有则将当前线程置为独占。如果已经被其他线程占用,则判断是否为当前线程,判定成功,重入并再次获取该锁,则state+1并更新状态。判定失败,返回false,等待入队等待。
3. addWaiter(Node) 入队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private Node addWaiter(Node mode) { //创建新的节点,设置关联线程和模式(mode有两种模式,EXCLUSIVE(独占),SHARED(共享)) Node node = new Node(Thread.currentThread(), mode); // 获取尾节点引用 Node pred = tail; if (pred != null) {//其他线程进入 node.prev = pred; // 设置新节点为尾节点 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾节点为空,说明队列还未进入 enq(node); return node; } |
3.1 enq(Node)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private Node enq(final Node node) { for (;;) {//开始自旋,直到成功加入队尾 Node t = tail; if (t == null) { // Must initialize //队列为空,创建一个空的节点作为head节点,并将tail也指向它 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //把节点加入队尾 //把tail指向node if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } |
1 2 3 4 5 6 |
//CAS操作,把head指向update private final boolean compareAndSetHead(Node update) { //this:当前对象 headOffset:偏移量也就是内存位置的值(head) null:预期值 update:替换值 //判定headOffset是不是为null,如果是则指向update,如果不是(已经被修改),则直接返回。 return unsafe.compareAndSwapObject(this, headOffset, null, update); } |
上面代码运用了自旋+CAS组合实现非阻塞的原子操作。
流程:
- 此时A,B两个线程执行到enq方法,假设A先执行了compareAndSetHead中的compareAndSwapObject(),A发现head指针就是预期的null,则把指针指向update, 此时B线程也执行到compareAndSwapObject(),发现head的指针并不是预期的null,不做任何处理直接返回(只是假设A先执行,当然B也是可能先执行的)。
- A、B线程继续自旋,此时的t不再是null,进入else。再假设A先执行compareAndSetTail(), 对比发现tail和预期的t一致,将tail指向node,随后将node加入队列。然后B执行compareAndSetTail()时,发现tail和预期的t不一致,返回后继续自旋,下一次循环将成功进入队列。
下面看一下在enq方法中链表是如何形成的。
- -> 代表指向的内存地址
- t = tail ->null head->null 参数node ->内存地址(暂叫B)
- head -> 空节点内存地址(暂叫A)
- tail = head 则 t = tail = head ->A
- node.prev = t 则 node ->B node.prev -> A
- CAS操作 tail = node 则 tail -> B tail.prev -> A
- t.next = node 则 t->A t.next->B 此时的tail指向尾部节点node。
- 另一个线程node->内存地址(暂叫C),t = tail 则 t -> B t.prev ->A
- node.prev = t 则 node ->C node.prev -> B
- CAS操作 tail = node 则 tail -> C tail.prev -> B
- t.next = node 则 t.prev->A t->B t.next->C 返回t
- head -> A .next ->B .next->C <- tail 一个链表就形成了。 t的前驱是A 也就是t.prev = head
4. acquireQueued(Node int)
通过tryAcquire()和addWaiter(),线程获取资源失败,入队列进入等待状态,直到其他线程彻底释放资源后唤醒自己。acquireQueued()就是让已经入队的线程尝试获取资源,若获取失败则查看是否可以去waiting。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
//尝试获取锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //默认没有获取到锁 try { boolean interrupted = false; //标记线程是否被中断过 for (;;) { //自旋 final Node p = node.predecessor(); //获取前驱节点 //如果前驱是head,则该node为首次进入队列的线程节点,有资格获取锁 if (p == head && tryAcquire(arg)) { setHead(node); // 将当前节点设置为head节点 p.next = null; // 删除原head节点,等待GC回收 failed = false; //获取成功 return interrupted; //返回是否被中断过 } // 判断获取失败后是否可以挂起,若可以则挂起 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 线程若被中断,设置interrupted为true interrupted = true; } } finally { if (failed) cancelAcquire(node); } } |
head->空节点.next->A.next->B.next->C 移除head 方法:head->A head.prev=null 空节点.next=null;
如果获取失败,则会进入shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
//判断当前线程是否需要waiting private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前驱节点的状态 int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 前驱节点状态为signal,表示当前线程就是接班人,可以挂起休息了。 return true; // 如果前驱节点放弃了 状态为CANCELLED if (ws > 0) { // 从当前节点一直往前找,直到找到一个正常等待的状态,并排在它后面 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驱正常,将前驱节点的状态设置为SIGNAL,自己好成为接班人,有可能失败! compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } |
1 2 3 4 |
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //挂起当前线程,进入waiting状态 return Thread.interrupted(); //查看是否被中断 } |
park()会让当前线程进入waiting状态,在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()
总结一下Lock的过程:
- 先进行CAS操作,其中某一线程成功获取到锁,未成功线程调用acquire()。
- 调用tryAcquire()尝试直接获取锁,如果成功则直接返回。
- 失败则调用addWaiter()将线程加入等待队列的尾部,并标记为独占模式。
- 最后acquireQueued(),自旋,如果前驱节点为head,则尝试获取锁,获取失败会进入shouldParkAfterFailedAcquire方法(前驱节点不为head则直接进该方法), 然后设置当前线程的前驱节点状态为signal,线程在队列中休息,等unpark()唤醒自己后再去尝试获取资源。
acquireQueued方法中出现异常,会进入finally,执行cancelAcquire方法,将当前节点取消:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
private void cancelAcquire(Node node) { if (node == null) return; // 设置该节点不再关联任何线程 node.thread = null; // 通过前继节点跳过取消状态的node Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取过滤后的前继节点的后继节点 Node predNext = pred.next; // 设置状态为取消状态 node.waitStatus = Node.CANCELLED; /* * 1.如果当前节点是tail: * 尝试更新tail节点,设置tail为pred; * 更新失败则返回,成功则设置tail的后继节点为null */ if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; /* * 2.如果当前节点不是head的后继节点: * 判断当前节点的前继节点的状态是否是SIGNAL,如果不是则尝试设置前继节点的状态为SIGNAL; * 上面两个条件如果有一个返回true,则再判断前继节点的thread是否不为空; * 若满足以上条件,则尝试设置当前节点的前继节点的后继节点为当前节点的后继节点,也就是相当于将当前节点从队列中删除 */ if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 3.如果是head的后继节点或者状态判断或设置失败,则唤醒当前节点的后继节点 unparkSuccessor(node); } node.next = node; // help GC } } |
该方法中执行的过程有些复杂,首先是要获取当前节点的前继节点,如果前继节点的状态不是取消状态(即pred.waitStatus > 0
),则向前遍历队列,直到遇到第一个waitStatus <= 0
的节点,并把当前节点的前继节点设置为该节点,然后设置当前节点的状态为取消状态。
后面分3种情况进行取消节点
1. 当前节点为tail,执行删除后:
2.当前节点不是head的后继节点,也不是tail,执行删除后:
3.当前节点是head的后继节点,执行删除后
之所以没有修改prev,是防止前驱节点出现null,导致链表不完整。
5.unlock()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //尝试释放锁 Node h = head; //h.waitStatus不等于0 则表示后续需要被唤醒 if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒后续线程 return true; } return false; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
protected final boolean tryRelease(int releases) { // 更新state状态值 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 锁被重入次数为0,表示释放成功 free = true; // 清空独占线程 setExclusiveOwnerThread(null); } setState(c); return free; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) //当前节点为SIGNAL,则修改为0 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; //若后续节点为null或者状态为CANCEL if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); } |
关于第10行代码从tail开始循环的原因:在循环前有两个判定条件 s==null 或者 s.waitStatus>0
第一种情况:我认为首先要考虑此时s都为null了为什么还要继续寻找,即使此时s==null,并不能认为链表就到尾部(node为tail)了,试想此时刚好有新线程入队列,那么node就不是tail了,已经有后继了,需要把这个后继取出。至于为什么从tail循环,看一下线程入队列的enq代码,在compareAndSetTail后面有一句t.next = node,如果在这两句代码之间执行了向后循环查找,你想想还能找到新入队的节点吗?
第二种情况:是为了防止有线程执行了cancelAcquire方法,为什么从tail循环,上面的3张图。
lock执行流程