博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
走进 AQS 瞧一瞧看一看
阅读量:7098 次
发布时间:2019-06-28

本文共 14352 字,大约阅读时间需要 47 分钟。

并发中有一块很重要的东西就是AQS。接下来一周的目标就是它。

看复杂源码时,一眼望过去,这是什么?不要慌,像剥洋葱一样,一层层剥开(哥,喜欢"扒开"这个词)。

参考资源:

https://www.cnblogs.com/waterystone/p/4920797.html

https://javadoop.com/post/AbstractQueuedSynchronizer#toc4

 

一概述

大师Doug Lea依赖FIFO(First-in-first-out)等待队列,创建了AQS框架来实现锁和同步器的操作。AQS是LimitLatch,countDownLacth,ReentrantLock,etc 这些类的基础。它们都是继承了AQS类,继承的时候,protected的方法根据业务需要必须重写,也就是tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared,isHeldExclusively中的一部分或是全部。

 

二AQS结构

1.我们看看AQS的属性:

静态内部类 Node (为什么要写静态内部类,个人觉得,Node是链表结构,在这里作为阻塞队列来使用,只有单独一个地方(AQS中)使用,所以写成静态内部类,也提高了封装性)

// 头结点

private transient volatile Node head;

// 尾结点

private transient volatile Node tail;

// 资源状态,等于0:没有被任何thread占有;大于0:资源已经被其他线程占有

 private volatile int state;

 

2.Node的内部构造

Node类至关重要,我懒了,贴一下代码,发现里面的注释真的很详细,就不再翻译一遍了。

1         /** Marker to indicate a node is waiting in shared mode */ // 共享模式  2         static final Node SHARED = new Node(); 3         /** Marker to indicate a node is waiting in exclusive mode */ // 独占模式  4         static final Node EXCLUSIVE = null; 5  6         /** waitStatus value to indicate thread has cancelled */ 7         static final int CANCELLED =  1; // 在这里,大家注意一下。waitStatus 值有1,-1,-2,中间越过了0.             // 其实,waitStatus 的值,有0 这种情况,初始值为0 8         /** waitStatus value to indicate successor's thread needs unparking */ 9         static final int SIGNAL    = -1;10         /** waitStatus value to indicate thread is waiting on condition */11         static final int CONDITION = -2;12         /**13          * waitStatus value to indicate the next acquireShared should14          * unconditionally propagate15          */16         static final int PROPAGATE = -3;17 18         /**19          * Status field, taking on only the values:20          *   SIGNAL:     The successor of this node is (or will soon be)21          *               blocked (via park), so the current node must22          *               unpark its successor when it releases or23          *               cancels. To avoid races, acquire methods must24          *               first indicate they need a signal,25          *               then retry the atomic acquire, and then,26          *               on failure, block.27          *   CANCELLED:  This node is cancelled due to timeout or interrupt.28          *               Nodes never leave this state. In particular,29          *               a thread with cancelled node never again blocks.30          *   CONDITION:  This node is currently on a condition queue.31          *               It will not be used as a sync queue node32          *               until transferred, at which time the status33          *               will be set to 0. (Use of this value here has34          *               nothing to do with the other uses of the35          *               field, but simplifies mechanics.)36          *   PROPAGATE:  A releaseShared should be propagated to other37          *               nodes. This is set (for head node only) in38          *               doReleaseShared to ensure propagation39          *               continues, even if other operations have40          *               since intervened.41          *   0:          None of the above42          *43          * The values are arranged numerically to simplify use.44          * Non-negative values mean that a node doesn't need to45          * signal. So, most code doesn't need to check for particular46          * values, just for sign.47          *48          * The field is initialized to 0 for normal sync nodes, and49          * CONDITION for condition nodes.  It is modified using CAS50          * (or when possible, unconditional volatile writes).51          */52         volatile int waitStatus;53 65         volatile Node prev;79          */80         volatile Node next;81 82         /**83          * The thread that enqueued this node.  Initialized on84          * construction and nulled out after use.85          */86         volatile Thread thread;87 98         Node nextWaiter;

 

接下来,进入到AQS源码分析环节。

概述中提到过下面这几个方法tryAcquire,tryRelease,tryAcquireShared,tryReleaseShared

它们分别是独占锁的获取和释放,共享锁的获取和释放。

这里,我们从独占锁的获取开始讲起。

 

3.独占锁方法解析

3.1 acquire 方法

1     /** 2      * Acquires in exclusive mode, ignoring interrupts.  Implemented 3      * by invoking at least once {
@link #tryAcquire}, 4 * returning on success. Otherwise the thread is queued, possibly 5 * repeatedly blocking and unblocking, invoking {
@link 6 * #tryAcquire} until success. This method can be used 7 * to implement method {
@link Lock#lock}. 8 * 9 * @param arg the acquire argument. This value is conveyed to10 * {
@link #tryAcquire} but is otherwise uninterpreted and11 * can represent anything you like.12 */ // 尝试获取锁,tryAcquire放回true,表示成功获取锁,就不会再往下走了。13 public final void acquire(int arg) {
// 尝试获取锁失败,并且,acquireQueued成功,那么就会进入方法中,执行自我中断 // 接下来,开始剥洋葱,方法逐个分析解惑14 if (!tryAcquire(arg) &&15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))16 selfInterrupt();17 }

 

3.2 tryAcquire 方法

1     /** 2      * Attempts to acquire in exclusive mode. This method should query 3      * if the state of the object permits it to be acquired in the 4      * exclusive mode, and if so to acquire it. 5      * 6      * 

This method is always invoked by the thread performing 7 * acquire. If this method reports failure, the acquire method 8 * may queue the thread, if it is not already queued, until it is 9 * signalled by a release from some other thread. This can be used10 * to implement method {

@link Lock#tryLock()}.11 *12 *

The default13 * implementation throws {

@link UnsupportedOperationException}.14 *15 * @param arg the acquire argument. This value is always the one16 * passed to an acquire method, or is the value saved on entry17 * to a condition wait. The value is otherwise uninterpreted18 * and can represent anything you like.19 * @return {
@code true} if successful. Upon success, this object has20 * been acquired.21 * @throws IllegalMonitorStateException if acquiring would place this22 * synchronizer in an illegal state. This exception must be23 * thrown in a consistent fashion for synchronization to work24 * correctly.25 * @throws UnsupportedOperationException if exclusive mode is not supported26 */ // 哇咔咔,这么长注释,不过别着急。慢慢看。 // 这个方法用来查询对象的state是否允许以独占方式来获取锁,如果可以,尝试获取。 // 接下里大家会疑问,为什么这个方法里面只有throw这一行代码。因为,这个方法需要在子类继承的时候需要被重写,这个就是设计模式中的模板方法。 // 同时,这个方法没有被做成abstract方法,因为,子类继承的时候为了自己的需求,只需要实现独占模式的方法或是共享模式的方法即可,不用都去实现。27 protected boolean tryAcquire(int arg) {28 throw new UnsupportedOperationException();29 }

 

3.3 acquireQueued方法

1 /** 2      * Acquires in exclusive uninterruptible mode for thread already in 3      * queue. Used by condition wait methods as well as acquire. 4      * 5      * @param node the node 6      * @param arg the acquire argument 7      * @return {
@code true} if interrupted while waiting 8 */ // 线程挂起后,被解锁,就是在这个方法里实现的 // 方法返回结果:等待时,是否被中断 9 final boolean acquireQueued(final Node node, int arg) {
// failed true:表示没有拿到资源 false:表示成功拿到资源10 boolean failed = true;11 try {
// interrupted 是否被中断12 boolean interrupted = false; // 又一个自旋的使用哦13 for (;;) {
// 获取前一个节点Node14 final Node p = node.predecessor();15 if (p == head && tryAcquire(arg)) {
// 设置头结点,将原先的头结点与node节点的连接,断开16 setHead(node);17 p.next = null; // help GC18 failed = false;19 return interrupted;20 } // 获取锁失败,是否应该挂起当前线程 // p 是前驱节点,node是当前线程节点21 if (shouldParkAfterFailedAcquire(p, node) && // 获取锁失败,挂起线程的操作在下面方法里实施22 parkAndCheckInterrupt())23 interrupted = true;24 }25 } finally {26 if (failed)27 cancelAcquire(node);28 }29 }

 

3.3.1 shouldParkAfterFailedAcquire方法

1 /** 2      * Checks and updates status for a node that failed to acquire. 3      * Returns true if thread should block. This is the main signal 4      * control in all acquire loops.  Requires that pred == node.prev. 5      * 6      * @param pred node's predecessor holding status 7      * @param node the node 8      * @return {
@code true} if thread should block 9 */ // 这个方法用途:获取锁失败,判断是否需要挂起线程10 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {11 int ws = pred.waitStatus; // 等待状态 SIGNAL表示前驱节点状态正常,当前线程需要挂起,直接返回true12 if (ws == Node.SIGNAL)13 /*14 * This node has already set status asking a release15 * to signal it, so it can safely park.16 */ // 这个节点已经让请求release的状态来标识,所以它可以安全的park了17 return true;18 if (ws > 0) {19 /*20 * Predecessor was cancelled. Skip over predecessors and21 * indicate retry.22 */ // ws大于0的状态1,表示取消了排队。 // 如果取消了排队,接着再去找前一个,前一个也被取消了,就找前一个的前一个,总会有一个没被取消的。23 do {24 node.prev = pred = pred.prev;25 } while (pred.waitStatus > 0);26 pred.next = node;27 } else {28 /*29 * waitStatus must be 0 or PROPAGATE. Indicate that we30 * need a signal, but don't park yet. Caller will need to31 * retry to make sure it cannot acquire before parking.32 */ // else的情况,就是waitStatus 为0,-2,-3 // 用CAS将前驱节点状态设为-1(Node.SIGNAL)33 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);34 }35 return false;36 }

 

3.3.2 parkAndCheckInterrupt方法

1 /**2      * Convenience method to park and then check if interrupted3      *4      * @return {
@code true} if interrupted5 */ // 在shouldParkAfterFailedAcquire返回true的时候,才会执行这个方法,这个方法的作用就是挂起线程6 private final boolean parkAndCheckInterrupt() {
// 调用park方法,使线程挂起 (使用unpark方法来唤醒线程)7 LockSupport.park(this); // 查看线程是否被中断8 return Thread.interrupted();9 }

 

3.4 addWaiter 方法

1     /** 2      * Creates and enqueues node for current thread and given mode. 3      * 4      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 5      * @return the new node 6      */ // 将Node放进阻塞队列的末尾  7     private Node addWaiter(Node mode) {
// 生成一个node节点,保存当前的线程和占用模式(独占模式或是共享模式) 8 Node node = new Node(Thread.currentThread(), mode); 9 // Try the fast path of enq; backup to full enq on failure10 Node pred = tail; // 查询tail节点是否有值,有值代表的是此链表不为空11 if (pred != null) {
// node相连接,很简单,就是改变指针指向,可以参考数据结构 链表12 node.prev = pred; // compareAndSetTail这个方法的操作就是比较赋值,经典的CAS方法,不明白的可以参照下面资源 // http://www.cnblogs.com/lihao007/p/8654787.html 13 if (compareAndSetTail(pred, node)) {14 pred.next = node;15 return node;16 }17 }18 enq(node);19 return node;20 }
 

3.4.1 eng()方法

1 /** 2      * Inserts node into queue, initializing if necessary. See picture above. 3      * @param node the node to insert 4      * @return node's predecessor 5      */ // 来到这个方法有两种可能。一是等待线程队列为空,二是其他线程更新了队列 6     private Node enq(final Node node) {
// 又是自旋方法(乐观锁),AQS源码中出现多次,因为需要线程的不安全 7 for (;;) { 8 Node t = tail; 9 if (t == null) { // Must initialize10 if (compareAndSetHead(new Node()))11 tail = head;12 } else {
// 将node放在队列最后,有线程竞争的话,排不上重排13 node.prev = t;14 if (compareAndSetTail(t, node)) {15 t.next = node;16 return t;17 }18 }19 }20 }

 独占模式获取锁,就分析到这里了。

 

4.release独占模式 释放锁

1 /** 2      * Releases in exclusive mode.  Implemented by unblocking one or 3      * more threads if {
@link #tryRelease} returns true. 4 * This method can be used to implement method {
@link Lock#unlock}. 5 * 6 * @param arg the release argument. This value is conveyed to 7 * {
@link #tryRelease} but is otherwise uninterpreted and 8 * can represent anything you like. 9 * @return the value returned from {
@link #tryRelease}10 */ // 独占模式下,释放锁 11 public final boolean release(int arg) {
// tryRelease方法是模板方法,留给子类定义。12 if (tryRelease(arg)) {13 Node h = head; // 首先判断阻塞队列是否为空。接下来,判断waitStatus的状态,0的状态是初始化是被赋予的值。只有是非0的状态,才说明head节点后面是有其它节点的。14 if (h != null && h.waitStatus != 0)15 unparkSuccessor(h);16 return true;17 }18 return false;19 }

 

4.1 unparkSuccessor

1 /** 2      * Wakes up node's successor, if one exists. 3      * 4      * @param node the node 5      */ 6     private void unparkSuccessor(Node node) { 7         /* 8          * If status is negative (i.e., possibly needing signal) try 9          * to clear in anticipation of signalling.  It is OK if this10          * fails or if status is changed by waiting thread.11          */12         int ws = node.waitStatus;13         if (ws < 0)14             compareAndSetWaitStatus(node, ws, 0);15 16         /*17          * Thread to unpark is held in successor, which is normally18          * just the next node.  But if cancelled or apparently null,19          * traverse backwards from tail to find the actual20          * non-cancelled successor.21          */22         Node s = node.next;23         if (s == null || s.waitStatus > 0) {24             s = null; // 这里,是从队列末尾向前查找没有被取消的节点                // 这里,我当时对为什么从后向前查找有疑问,后来看了文章明白了。地址:https://javadoop.com/post/AbstractQueuedSynchronizer#toc425             for (Node t = tail; t != null && t != node; t = t.prev)26                 if (t.waitStatus <= 0)27                     s = t;28         }29         if (s != null)30             LockSupport.unpark(s.thread);31     }

 

 就写到这里了,仍需努力,以后有想法了,慢慢补充,写的不对的地方,欢迎指正,共同探讨。

 

 

 

转载于:https://www.cnblogs.com/lihao007/p/8647851.html

你可能感兴趣的文章
localhost兼容js不能用
查看>>
Makefile 10——打造更专业的编译环境-huge项目
查看>>
hive正則表達式
查看>>
Create and Call HttpHandler in SharePoint
查看>>
pymysql.err.InternalError: (1054, "Unknown column 'None' in 'field list'")
查看>>
树莓派与window 10组成的物联网核心:让人失望
查看>>
《生活在Linux中》之:在Bash的Emacs模式中使用Vim
查看>>
HDOJ 5411 CRB and Puzzle 矩阵高速幂
查看>>
[LeetCode] Maximum Vacation Days 最大化休假日
查看>>
Microsoft Word、Excel、PowerPoint转Pdf
查看>>
Servlet概述
查看>>
Servlet的异常处理
查看>>
支付宝 app支付 沙盘使用
查看>>
Redis持久化配置-AOF
查看>>
计算机网络的应用层简单介绍:
查看>>
需求管理之客户需求何时休?
查看>>
Java进化? Kotlin初探与集成Android项目
查看>>
URL中的#
查看>>
CentOS自带mysql配置(密码更改、端口开放访问、添加进系统启动项)
查看>>
MYSQL中动态行数据转列数据
查看>>