CountDownLatch簡介
CountDownLatch是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
CountDownLatch和CyclicBarrier的區別
(01) CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程相互等待。
(02) CountDownLatch的計數器無法被重置;CyclicBarrier的計數器可以被重置後使用,因此它被稱為是循環的barrier。
關於CyclicBarrier的原理,後面一章再來學習。
CountDownLatch函數列表
CountDownLatch(int count)
構造一個用給定計數初始化的 CountDownLatch。
// 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。 void await() // 使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。 void countDown() // 返回當前計數。 long getCount() // 返回標識此鎖存器及其狀態的字符串。 String toString()
CountDownLatch數據結構
CountDownLatch的UML類圖如下:
CountDownLatch的數據結構很簡單,它是通過"共享鎖"實現的。它包含了sync對象,sync是Sync類型。Sync是實例類,它繼承於AQS。
1. CountDownLatch(int count)
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
說明:該函數是創建一個Sync對象,而Sync是繼承於AQS類。Sync構造函數如下:
Sync(int count) { setState(count); }
setState()在AQS中實現,源碼如下:
protected final void setState(long newState) { state = newState; }
說明:在AQS中,state是一個private volatile long類型的對象。對於CountDownLatch而言,state表示的”鎖計數器“。CountDownLatch中的getCount()最終是調用AQS中的getState(),返回的state對象,即”鎖計數器“。
2. await()
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
說明:該函數實際上是調用的AQS的acquireSharedInterruptibly(1);
AQS中的acquireSharedInterruptibly()的源碼如下:
public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
說明:acquireSharedInterruptibly()的作用是獲取共享鎖。
如果當前線程是中斷狀態,則拋出異常InterruptedException。否則,調用tryAcquireShared(arg)嘗試獲取共享鎖;嘗試成功則返回,否則就調用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()會使當前線程一直等待,直到當前線程獲取到共享鎖(或被中斷)才返回。
tryAcquireShared()在CountDownLatch.java中被重寫,它的源碼如下:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
說明:tryAcquireShared()的作用是嘗試獲取共享鎖。
如果"鎖計數器=0",即鎖是可獲取狀態,則返回1;否則,鎖是不可獲取狀態,則返回-1。
private void doAcquireSharedInterruptibly(long arg) throws InterruptedException { // 創建"當前線程"的Node節點,且Node中記錄的鎖是"共享鎖"類型;並將該節點添加到CLH隊列末尾。 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 獲取上一個節點。 // 如果上一節點是CLH隊列的表頭,則"嘗試獲取共享鎖"。 final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // (上一節點不是CLH隊列的表頭) 當前線程一直等待,直到獲取到共享鎖。 // 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態)。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
說明:
(01) addWaiter(Node.SHARED)的作用是,創建”當前線程“的Node節點,且Node中記錄的鎖的類型是”共享鎖“(Node.SHARED);並將該節點添加到CLH隊列末尾。
(02) node.predecessor()的作用是,獲取上一個節點。如果上一節點是CLH隊列的表頭,則”嘗試獲取共享鎖“。
(03) shouldParkAfterFailedAcquire()的作用和它的名稱一樣,如果在嘗試獲取鎖失敗之後,線程應該等待,則返回true;否則,返回false。
(04) 當shouldParkAfterFailedAcquire()返回ture時,則調用parkAndCheckInterrupt(),當前線程會進入等待狀態,直到獲取到共享鎖才繼續運行。
3. countDown()
public void countDown() { sync.releaseShared(1); }
說明:該函數實際上調用releaseShared(1)釋放共享鎖。
releaseShared()在AQS中實現,源碼如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。
tryReleaseShared()在CountDownLatch.java中被重寫,源碼如下:
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { // 獲取“鎖計數器”的狀態 int c = getState(); if (c == 0) return false; // “鎖計數器”-1 int nextc = c-1; // 通過CAS函數進行賦值。 if (compareAndSetState(c, nextc)) return nextc == 0; } }
說明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計數器”的值-1。
總結:CountDownLatch是通過“共享鎖”實現的。在創建CountDownLatch中時,會傳遞一個int類型參數count,該參數是“鎖計數器”的初始狀態,表示該“共享鎖”最多能被count給線程同時獲取。當某線程調用該CountDownLatch對象的await()方法時,該線程會等待“共享鎖”可用時,才能獲取“共享鎖”進而繼續運行。而“共享鎖”可用的條件,就是“鎖計數器”的值為0!而“鎖計數器”的初始值為count,每當一個線程調用該CountDownLatch對象的countDown()方法時,才將“鎖計數器”-1;通過這種方式,必須有count個線程調用countDown()之後,“鎖計數器”才為0,而前面提到的等待線程才能繼續運行!
以上,就是CountDownLatch的實現原理。
CountDownLatch的使用示例
下面通過CountDownLatch實現:"主線程"等待"5個子線程"全部都完成"指定的工作(休眠1000ms)"之後,再繼續運行。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; public class CountDownLatchTest1 { private static int LATCH_SIZE = 5; private static CountDownLatch doneSignal; public static void main(String[] args) { try { doneSignal = new CountDownLatch(LATCH_SIZE); // 新建5個任務 for(int i=0; i<LATCH_SIZE; i++) new InnerThread().start(); System.out.println("main await begin."); // "主線程"等待線程池中5個任務的完成 doneSignal.await(); System.out.println("main await finished."); } catch (InterruptedException e) { e.printStackTrace(); } } static class InnerThread extends Thread{ public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " sleep 1000ms."); // 將CountDownLatch的數值減1 doneSignal.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
運行結果:
main await begin. Thread-0 sleep 1000ms. Thread-2 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. Thread-3 sleep 1000ms. main await finished.
結果說明:主線程通過doneSignal.await()等待其它線程將doneSignal遞減至0。其它的5個InnerThread線程,每一個都通過doneSignal.countDown()將doneSignal的值減1;當doneSignal為0時,main被喚醒後繼續執行。