Java中的FutureTask作為可異步執行任務並可獲取執行結果而被大家所熟知。通常可以使用future.get()來獲取線程的執行結果,在線程執行結束之前,get方法會一直阻塞狀態,直到call()返回,其優點是使用線程異步執行任務的情況下還可以獲取到線程的執行結果,但是FutureTask的以上功能卻是依靠通過一個叫AbstractQueuedSynchronizer的類來實現,至少在JDK 1.5、JDK1.6版本是這樣的(從1.7開始FutureTask已經被其作者Doug Lea修改為不再依賴AbstractQueuedSynchronizer實現了,這是JDK1.7的變化之一)。但是AbstractQueuedSynchronizer在JDK1.8中還有如下圖所示的眾多子類:
這些JDK中的工具類或多或少都被大家用過不止一次,比如ReentrantLock,我們知道ReentrantLock的功能是實現代碼段的並發訪問控制,也就是通常意義上所說的鎖,在沒有看到AbstractQueuedSynchronizer前,可能會以為它的實現是通過類似於synchronized,通過對對象加鎖來實現的。但事實上它僅僅是一個工具類!沒有使用更“高級”的機器指令,不是關鍵字,也不依靠JDK編譯時的特殊處理,僅僅作為一個普普通通的類就完成了代碼塊的並發訪問控制,這就更讓人疑問它怎麼實現的代碼塊的並發訪問控制的了。那就讓我們一起來仔細看下Doug Lea怎麼去實現的這個鎖。為了方便,本文中使用AQS代替AbstractQueuedSynchronizer。
在真正對解讀AQS之前,我想先從使用了它獨占控制功能的子類ReentrantLock說起,分析ReentrantLock的同時看一看AQS的實現,再推理出AQS獨特的設計思路和實現方式。最後,在看其共享控制功能的實現。
對於ReentrantLock,使用過的同學應該都知道,通常是這麼用它的:
reentrantLock.lock() //do something reentrantLock.unlock()
ReentrantLock會保證 do something在同一時間只有一個線程在執行這段代碼,或者說,同一時刻只有一個線程的lock方法會返回。其余線程會被掛起,直到獲取鎖。從這裡可以看出,其實ReentrantLock實現的就是一個獨占鎖的功能:有且只有一個線程獲取到鎖,其余線程全部掛起,直到該擁有鎖的線程釋放鎖,被掛起的線程被喚醒重新開始競爭鎖。沒錯,ReentrantLock使用的就是AQS的獨占API實現的。
那現在我們就從ReentrantLock的實現開始一起看看重入鎖是怎麼實現的。
首先看lock方法:
如FutureTask(JDK1.6)一樣,ReentrantLock內部有代理類完成具體操作,ReentrantLock只是封裝了統一的一套API而已。值得注意的是,使用過ReentrantLock的同學應該知道,ReentrantLock又分為公平鎖和非公平鎖之分,所以,ReentrantLock內部只有有兩個sync的實現:
公平鎖:每個線程搶占鎖的順序為先後調用lock方法的順序依次獲取鎖,類似於排隊吃飯。
非公平鎖:每個線程搶占鎖的順序不定,誰運氣好,誰就獲取到鎖,和調用lock方法的先後順序無關,類似於堵車時,加塞的那些XXXX。
到這裡,通過ReentrantLock的功能和鎖的所謂排不排隊的方式,我們是否可以這麼猜測ReentrantLock或者AQS的實現(現在不清楚誰去實現這些功能):有那麼一個被volatile修飾的標志位叫做key,用來表示有沒有線程拿走了鎖,或者說,鎖還存不存在,還需要一個線程安全的隊列,維護一堆被掛起的線程,以至於當鎖被歸還時,能通知到這些被掛起的線程,可以來競爭獲取鎖了。
至於公平鎖和非公平鎖,唯一的區別是在獲取鎖的時候是直接去獲取鎖,還是進入隊列排隊的問題了。為了驗證我們的猜想,我們繼續看一下ReentrantLock中公平鎖的實現、
調用到了AQS的acquire方法,
從方法名字上看語義是,嘗試獲取鎖,獲取不到則創建一個waiter(當前線程)後放到隊列中,這和我們猜測的好像很類似。[G1]
先看下tryAcquire方法:
留空了,Doug Lea是想留給子類去實現(既然要給子類實現,應該用抽象方法,但是Doug Lea沒有這麼做,原因是AQS有兩種功能,面向兩種使用場景,需要給子類定義的方法都是抽象方法了,會導致子類無論如何都需要實現另外一種場景的抽象方法,顯然,這對子類來說是不友好的。)
看下FairSync的tryAcquire方法:
getState方法是AQS的方法,因為在AQS裡面有個叫statede的標志位 :
事實上,這個state就是前面我們猜想的那個“key”!
回到tryAcquire方法:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//獲取當前線程 int c = getState(); //獲取父類AQS中的標志位 if (c == 0) { if (!hasQueuedPredecessors() && //如果隊列中沒有其他線程 說明沒有線程正在占有鎖! compareAndSetState(0, acquires)) { //修改一下狀態位,注意:這裡的acquires是在lock的時候傳遞來的,從上面的圖中可以知道,這個值是寫死的1 setExclusiveOwnerThread(current); //如果通過CAS操作將狀態為更新成功則代表當前線程獲取鎖,因此,將當前線程設置到AQS的一個變量中,說明這個線程拿走了鎖。 return true; } } else if (current == getExclusiveOwnerThread()) { //如果不為0 意味著,鎖已經被拿走了,但是,因為ReentrantLock是重入鎖, //是可以重復lock,unlock的,只要成對出現行。一次。這裡還要再判斷一次 獲取鎖的線程是不是當前請求鎖的線程。 int nextc = c + acquires;//如果是的,累加在state字段上就可以了。 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
到此,如果如果獲取鎖,tryAcquire返回true,反之,返回false,回到AQS的acquire方法。
如果沒有獲取到鎖,按照我們的描述,應該講當前線程放到隊列中區,只不過,在放之前,需要做些包裝。
先看addWaiter方法:
用當前線程去構造一個Node對象,mode是一個表示Node類型的字段,僅僅表示這個節點是獨占的,還是共享的,或者說,AQS的這個隊列中,哪些節點是獨占的,哪些是共享的。
這裡lock調用的是AQS獨占的API,當然,可以寫死是獨占狀態的節點。
創建好節點後,將節點加入到隊列尾部,此處,在隊列不為空的時候,先嘗試通過cas方式修改尾節點為最新的節點,如果修改失敗,意味著有並發,這個時候才會進入enq中死循環,“自旋”方式修改。
將線程的節點接入到隊裡中後,當讓還需要做一件事:將當前線程掛起!這個事,由acquireQueued來做。
在解釋acquireQueued之前,我們需要先看下AQS中隊列的內存結構,我們知道,隊列由Node類型的節點組成,其中至少有兩個變量,一個封裝線程,一個封裝節點類型。
而實際上,它的內存結構是這樣的(第一次節點插入時,第一個節點是一個空節點,代表有一個線程已經獲取鎖,事實上,隊列的第一個節點就是代表持有鎖的節點):
黃色節點為隊列默認的頭節點,每次有線程競爭失敗,進入隊列後其實都是插入到頭節點後面。這個從enq方法可以看出來,上文中有提到enq方法為將節點插入隊列的方法:
再回來看看
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //如果當前的節點是head說明他是隊列中第一個“有效的”節點,因此嘗試獲取,上文中有提到這個類是交給子類去擴展的。 setHead(node);//成功後,將上圖中的黃色節點移除,Node1變成頭節點。 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && //否則,檢查前一個節點的狀態為,看當前獲取鎖失敗的線程是否需要掛起。 parkAndCheckInterrupt()) //如果需要,借助JUC包下的LockSopport類的靜態方法Park掛起當前線程。知道被喚醒。 interrupted = true; } } finally { if (failed) //如果有異常 cancelAcquire(node);// 取消請求,對應到隊列操作,就是將當前節點從隊列中移除。 } }
這塊代碼有幾點需要說明:
1. Node節點中,除了存儲當前線程,節點類型,隊列中前後元素的變量,還有一個叫waitStatus的變量,改變量用於描述節點的狀態,為什麼需要這個狀態呢?
原因是:AQS的隊列中,在有並發時,肯定會存取一定數量的節點,每個節點[G4] 代表了一個線程的狀態,有的線程可能可能“等不及”獲取鎖了,需要放棄競爭,退出隊列,有點線程在等待一些條件滿足,滿足後才恢復執行(這裡的描述很像某個J.U.C包下的工具類,ReentrankLock的Condition,事實上,Condition同樣也是AQS的子類)等等,總之,各個線程有各個線程的狀態,但總需要一個變量買描述它,這個變量就叫waitStatus,它有四種狀態:
分別表示:
節點取消
節點等待觸發
節點等待條件
節點狀態需要向後傳播。
只有當前節點的前一個節點為SIGNAL時,才能當前節點才能被掛起。
2. 對線程的掛起及喚醒操作是通過使用UNSAFE類調用JNI方法實現的。當然,還提供了掛起指定時間後喚醒的API,在後面我們會講到。
到此為止,一個線程對於鎖的一次競爭才告一段落,結果又兩種,要麼成功獲取到鎖(不用進入到AQS隊列中,),要麼,獲取失敗,被掛起,等待下次喚醒後繼續循環嘗試獲取鎖,值得注意的是,AQS的隊列為FIFO隊列,所以,每次及時被CPU假喚醒,且當前線程不是出在頭節點的位置,也是會被掛起的。AQS通過這樣的方式,實現了競爭的排隊策略。
看完了獲取鎖,在看看釋放鎖,具體看代碼之前,我們可以先繼續猜下,釋放操作需要做哪些事情:
因為獲取鎖的線程的節點,此時在AQS的頭節點位置,所以,可能需要將頭節點移除。
而應該是直接釋放鎖,然後找到AQS的頭節點,通知它可以來競爭鎖了。
是不是這樣呢?我們繼續來看下,同樣我們用ReentrantLock的FairSync來說明:
unlock方法調用了AQS的release方法,同樣傳入了參數1,和獲取鎖的相應對應,獲取一個鎖,標示為+1,釋放一個鎖,標志位-1。
同樣,release為空方法,子類自己實現邏輯:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) //如果釋放的線程和獲取鎖的線程不是同一個,拋出非法監視器狀態異常。 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//因為是重入的關系,不是每次釋放鎖c都等於0,直到最後一次釋放鎖時,才通知AQS不需要再記錄哪個線程正在獲取鎖。 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
釋放鎖,成功後,找到AQS的頭節點,並喚醒它即可:
值得注意的是,尋找的順序是從隊列尾部開始往前去找的最前面的一個waitStatus小於0的節點。
到此,ReentrantLock的lock和unlock方法已經基本解析完畢了,唯獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,一個是按前後順序一次獲取鎖,一個是搶占式的獲取鎖,那ReentrantLock是怎麼實現的呢?再看兩段代碼:
非公平鎖的lock方法的處理方式是: 在lock的時候先直接cas修改一次state變量(嘗試獲取鎖),成功就返回,不成功再排隊,從而達到不排隊直接搶占的目的。
而對於公平鎖:則是老老實實的開始就走AQS的流程排隊獲取鎖。如果前面有人調用過其lock方法,則排在隊列中前面,也就更有機會更早的獲取鎖,從而達到“公平”的目的。
這篇文章,我們從ReentrantLock出發,完整的分析了AQS獨占功能的API及內部實現,總的來說,思路其實並不復雜,還是使用的標志位+隊列的方式,記錄獲取鎖、競爭鎖、釋放鎖等一系列鎖的狀態,或許用更准確一點的描述的話,應該是使用的標志位+隊列的方式,記錄鎖,競爭,釋放等一系列獨占的狀態,因為站在AQS的層面state可以表示鎖,也可以表示其他狀態,它並不關心它的子類把它變成一個什麼工具類,而只是提供了一套維護一個獨占狀態。甚至,最准確的是AQS只是維護了一個狀態,因為,別忘了,它還有一套共享狀態的API,所以,AQS只是維護一個狀態,一個控制各個線程何時可以訪問的狀態,它只對狀態負責,而這個狀態表示什麼含義,由子類自己去定義。