最近一直在邊工作邊學習分布式的東西,看到了構建Java中間件的基礎知識,裡面有提到Java多線程並發的工具類,例如ReentrantLock、CyclicBarrier、CountDownLatch...
以前在工作中也有用到過這些實用的工具類,但是了解不是特別深入,借此機會打個卡,好記性不如爛博客,哈哈哈...
CountDownLatch顧名思義,count + down + latch = 計數 + 減 + 門闩(這麼拆分也是便於記憶=_=) 可以理解這個東西就是個計數器,只能減不能加,同時它還有個門闩的作用,當計數器不為0時,門闩是鎖著的;當計數器減到0時,門闩就打開了。
如果你感到懵比的話,可以類比考生考試交卷,考生交一份試卷,計數器就減一。直到考生都交了試卷(計數器為0),監考老師(一個或多個)才能離開考場。至於考生是否做完試卷,監考老師並不關注。只要都交了試卷,他就可以做接下來的工作了。
既然知道了它的定義,那什麼時候使用它呢?筆者能想到的場景是:
有任務A和任務B,任務B必須在任務A完成之後再做。而任務A還能被分為n部分,並且這n部分之間的任務互不影響。為了加快任務完成進度,把這n部分任務分給不同的線程,當A任務完成了,然後通知做B任務的線程接著完成任務,至於完成B任務的線程,可以是一個,也可以是多個。
上圖:
接下來就跟筆者來扒一扒CountDownLatch的源碼,它到底是怎麼實現這個牛逼功能的。(源碼版本JDK1.8)
public class CountDownLatch { // 內部類 繼承AQS類 private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } //AQS子類的實例對象 private final Sync sync; // 有參構造器 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 等待 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 超時等待 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // 計數減1 public void countDown() { sync.releaseShared(1); } // 獲取計數器當前計數 public long getCount() { return sync.getCount(); } // 吐司就不多說了吧 public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
熟悉ReentrantLock的讀者應該知道這種結構,它也是采用這種結構來完成功能的,只是ReentrantLock在Sync這個內部類下,它還分了NonfairSync(非公平鎖)和FairSync(公平鎖)這兩個類來繼承Sync這個父類。這種結構的好處在於我們不必關心AbstractQueuedSynchronizer(以下簡稱AQS)的同步狀態管理、線程排隊、等待與喚醒等底層操作,我們只需重寫我們想要的方法(例如:tryAcquireShared tryReleaseShared)然後調用繼承AQS的方法(例如:getState(),setState())來改變同步狀態,即可生成我們特定的並發業務工具類。
扯遠了哈,接下來筆者准備分兩條線來分析CountDownLatch,一是CountDownLatch.await()阻塞當前線程,二是CountDownLatch.countDown()當前線程把計數器減一
一、CountDownLatch.await()
猜想一下:
提問:實現這個功能你能想到的方法有哪些?
回答:第一 你可能會使用線程的join(),讓當前線程等待join線程執行結束。其原理是不停檢查join線程是否存活,如果join線程存活則當前線程永遠等待。
第二 你可能會使用線程間wait/notify,進入synchronized同步塊或方法中,檢查計數器值不為0,然後調用Object.wait();直到值為0則調用notifyAll()喚醒等待線程。
分析:方法一 如果只有兩三個線程還好,如果數量過多,那得寫多少join啊,而且提前結束任務還得捕獲InterruptException異常,繁瑣...
方法二 大量synchronized同步塊,還可能存在假喚醒...
結論:上面提到的方法或多或少都存在這樣那樣的弊端,那我們就猜想一下思路解決這些弊端
其一 我們可能需要一個volatile變量來實時感知計數器的值,一旦計數器值為0則喚醒阻塞在該條件上的線程
其二 因為volatile只有數據實時透明性,它並不能保證線程的順序執行,所以我們可能需要一個同步隊列來放置這些阻塞隊列,當計數器值為0時,從隊列中挨著一個個喚醒線程
下面開始我們的驗證:
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } Sync(int count) { setState(count); }
構造方法傳入了一個int變量,而我們跟進去發現,這個int變量是AQS中的state,類型是volatile的,它就是用來表示計數器值的。由此證明我們的猜想。注意:count值需要大於等於0
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
當我們調用await()的方法後,會默認調用sync這個實例的acquireSharedInterruptibly這個方法,並且參數為1,需要注意的是,這個方法聲明了一個InterruptedException異常,表示調用該方法的線程支持打斷操作。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
我們跟進源碼發現,acquireSharedInterruptibly這個方法是sync繼承AQS而來的,這個方法的調用是響應線程的打斷的,所以在前兩行會檢查線程是否被打斷。接著調用tryAcquireShared()方法來判斷返回值,根據值的大小決定是否執行doAcquireSharedInterruptibly()。
// AQS中的方法
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // Sync中的方法 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // AQS中的方法 protected final int getState() { return state;// state是volatile }
我們看到AQS把這個方法留給子類去實現,在子類sync的tryAcquireShared中它只驗證了計數器的值是否為0,如果為0則返回1,反之返回-1,根據上面可以看出,整數就不會執行doAcquireSharedInterruptibly(),該線程就結束方法,繼續執行自己的代碼去了。
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);// 往同步隊列中添加節點 boolean failed = true; try { for (;;) {// 一個死循環 跳出循環只有下面兩個途徑 final Node p = node.predecessor();// 當前線程的前一個節點 if (p == head) {// 如果是首節點 int r = tryAcquireShared(arg);// 這個是不是似曾相識 見上面 if (r >= 0) { setHeadAndPropagate(node, r);// 處理後續節點 p.next = null; // help GC 這個可以借鑒 failed = false; return;// 計數值為0 並且為頭節點 跳出循環 } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();// 響應打斷 跳出循環 } } finally { if (failed) cancelAcquire(node);// 如果是打斷退出的 則移除同步隊列節點 } }
接著我們來看doAcquireSharedInterruptibly這個方法,因為計數器值不為0需要阻塞線程,所以在進入方法時,將該線程包裝成節點並加入到同步隊列尾部(如何添加源碼稍後展示),我們看到這個方法退出去的途徑直有兩個,一個是return,一個是throw InterruptedException。注意最後的finally的處理。
return退出方法有兩個條件,首先計數值為0,接著必須是同步節點首節點。
throw InterruptedException是響應打斷操作的,線程在阻塞期間,如果你不想在等待了,可以打斷線程讓它繼續運行後面的任務(注意異常處理)
接著我們看看添加節點的源碼:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);// 包裝節點 Node pred = tail;// 同步隊列尾節點 if (pred != null) {// 同步隊列有尾節點 將我們的節點通過cas方式添加到隊列後面 node.prev = pred; if (compareAndSetTail(pred, node)) {// 以cas原子方式添加尾節點 pred.next = node; return node;// 退出該方法 } } enq(node);// 兩種情況執行這個代碼 1.隊列尾節點為null 2.隊列尾節點不為null,但是我們原子添加尾節點失敗 return node; } private Node enq(final Node node) { for (;;) {// 又是一個死循環 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node()))// cas形式添加頭節點 注意 是頭節點 tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {// cas形式添加尾節點 t.next = node; return t;// 結束這個方法的唯一出口 添加尾節點成功 } } } }
至此,CountDownLatch.await()阻塞當前線程的基本功能已經梳理出來了,CountDownLatch.countDown()計數器減一功能以及CountDownLatch示例和它的優缺點將留在下部分梳理。
然後,限於篇幅更多的compareAndSetHead()和compareAndSetTail()這些末節方法未詳細列出,希望讀者能自行查看api了解。
最後,由於筆者水平有限,難免有不足之處,有不對之處,請不吝惜指教。謝謝!