程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Java CountDownLatch解析(上),javacountdownlatch

Java CountDownLatch解析(上),javacountdownlatch

編輯:JAVA綜合教程

Java CountDownLatch解析(上),javacountdownlatch


  • 寫在前面的話

最近一直在邊工作邊學習分布式的東西,看到了構建Java中間件的基礎知識,裡面有提到Java多線程並發的工具類,例如ReentrantLock、CyclicBarrier、CountDownLatch...

以前在工作中也有用到過這些實用的工具類,但是了解不是特別深入,借此機會打個卡,好記性不如爛博客,哈哈哈...

  • CountDownLatch簡介

CountDownLatch顧名思義,count + down + latch = 計數 + 減 + 門闩(這麼拆分也是便於記憶=_=) 可以理解這個東西就是個計數器,只能減不能加,同時它還有個門闩的作用,當計數器不為0時,門闩是鎖著的;當計數器減到0時,門闩就打開了。

如果你感到懵比的話,可以類比考生考試交卷,考生交一份試卷,計數器就減一。直到考生都交了試卷(計數器為0),監考老師(一個或多個)才能離開考場。至於考生是否做完試卷,監考老師並不關注。只要都交了試卷,他就可以做接下來的工作了。

  • CountDownLatch實用場景

既然知道了它的定義,那什麼時候使用它呢?筆者能想到的場景是:

有任務A和任務B,任務B必須在任務A完成之後再做。而任務A還能被分為n部分,並且這n部分之間的任務互不影響。為了加快任務完成進度,把這n部分任務分給不同的線程,當A任務完成了,然後通知做B任務的線程接著完成任務,至於完成B任務的線程,可以是一個,也可以是多個。

上圖:

 

  • CountDownLatch實現原理

接下來就跟筆者來扒一扒CountDownLatch的源碼,它到底是怎麼實現這個牛逼功能的。(源碼版本JDK1.8)

public class CountDownLatch {

    // 內部類 繼承AQS類
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    //AQS子類的實例對象
    private final Sync sync;

    // 有參構造器
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    // 等待
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    // 超時等待
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 計數減1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 獲取計數器當前計數
    public long getCount() {
        return sync.getCount();
    }

    // 吐司就不多說了吧
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

熟悉ReentrantLock的讀者應該知道這種結構,它也是采用這種結構來完成功能的,只是ReentrantLock在Sync這個內部類下,它還分了NonfairSync(非公平鎖)和FairSync(公平鎖)這兩個類來繼承Sync這個父類。這種結構的好處在於我們不必關心AbstractQueuedSynchronizer(以下簡稱AQS)的同步狀態管理、線程排隊、等待與喚醒等底層操作,我們只需重寫我們想要的方法(例如:tryAcquireShared tryReleaseShared)然後調用繼承AQS的方法(例如:getState(),setState())來改變同步狀態,即可生成我們特定的並發業務工具類。

扯遠了哈,接下來筆者准備分兩條線來分析CountDownLatch,一是CountDownLatch.await()阻塞當前線程,二是CountDownLatch.countDown()當前線程把計數器減一

一、CountDownLatch.await()

猜想一下:

 提問:實現這個功能你能想到的方法有哪些?

 回答:第一 你可能會使用線程的join(),讓當前線程等待join線程執行結束。其原理是不停檢查join線程是否存活,如果join線程存活則當前線程永遠等待。

    第二 你可能會使用線程間wait/notify,進入synchronized同步塊或方法中,檢查計數器值不為0,然後調用Object.wait();直到值為0則調用notifyAll()喚醒等待線程。

 分析:方法一 如果只有兩三個線程還好,如果數量過多,那得寫多少join啊,而且提前結束任務還得捕獲InterruptException異常,繁瑣...

    方法二 大量synchronized同步塊,還可能存在假喚醒...

 結論:上面提到的方法或多或少都存在這樣那樣的弊端,那我們就猜想一下思路解決這些弊端

    其一 我們可能需要一個volatile變量來實時感知計數器的值,一旦計數器值為0則喚醒阻塞在該條件上的線程

    其二 因為volatile只有數據實時透明性,它並不能保證線程的順序執行,所以我們可能需要一個同步隊列來放置這些阻塞隊列,當計數器值為0時,從隊列中挨著一個個喚醒線程

下面開始我們的驗證:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

Sync(int count) {
    setState(count);
}

構造方法傳入了一個int變量,而我們跟進去發現,這個int變量是AQS中的state,類型是volatile的,它就是用來表示計數器值的。由此證明我們的猜想。注意:count值需要大於等於0

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

當我們調用await()的方法後,會默認調用sync這個實例的acquireSharedInterruptibly這個方法,並且參數為1,需要注意的是,這個方法聲明了一個InterruptedException異常,表示調用該方法的線程支持打斷操作。

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我們跟進源碼發現,acquireSharedInterruptibly這個方法是sync繼承AQS而來的,這個方法的調用是響應線程的打斷的,所以在前兩行會檢查線程是否被打斷。接著調用tryAcquireShared()方法來判斷返回值,根據值的大小決定是否執行doAcquireSharedInterruptibly()。

// AQS中的方法
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } // Sync中的方法 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // AQS中的方法 protected final int getState() { return state;// state是volatile }

我們看到AQS把這個方法留給子類去實現,在子類sync的tryAcquireShared中它只驗證了計數器的值是否為0,如果為0則返回1,反之返回-1,根據上面可以看出,整數就不會執行doAcquireSharedInterruptibly(),該線程就結束方法,繼續執行自己的代碼去了。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// 往同步隊列中添加節點
    boolean failed = true;
    try {
        for (;;) {// 一個死循環 跳出循環只有下面兩個途徑
            final Node p = node.predecessor();// 當前線程的前一個節點
            if (p == head) {// 如果是首節點
                int r = tryAcquireShared(arg);// 這個是不是似曾相識 見上面
                if (r >= 0) {
                    setHeadAndPropagate(node, r);// 處理後續節點
                    p.next = null; // help GC 這個可以借鑒
                    failed = false;
                    return;// 計數值為0 並且為頭節點 跳出循環
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();// 響應打斷 跳出循環
        }
    } finally {
        if (failed)
            cancelAcquire(node);// 如果是打斷退出的 則移除同步隊列節點
    }
}

接著我們來看doAcquireSharedInterruptibly這個方法,因為計數器值不為0需要阻塞線程,所以在進入方法時,將該線程包裝成節點並加入到同步隊列尾部(如何添加源碼稍後展示),我們看到這個方法退出去的途徑直有兩個,一個是return,一個是throw InterruptedException。注意最後的finally的處理。

return退出方法有兩個條件,首先計數值為0,接著必須是同步節點首節點。

throw InterruptedException是響應打斷操作的,線程在阻塞期間,如果你不想在等待了,可以打斷線程讓它繼續運行後面的任務(注意異常處理)

接著我們看看添加節點的源碼:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);// 包裝節點
    Node pred = tail;// 同步隊列尾節點
    if (pred != null) {// 同步隊列有尾節點 將我們的節點通過cas方式添加到隊列後面
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {// 以cas原子方式添加尾節點
            pred.next = node;
            return node;// 退出該方法
        }
    }
    enq(node);// 兩種情況執行這個代碼 1.隊列尾節點為null 2.隊列尾節點不為null,但是我們原子添加尾節點失敗
    return node;
}

private Node enq(final Node node) {
    for (;;) {// 又是一個死循環
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))// cas形式添加頭節點  注意 是頭節點
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {// cas形式添加尾節點
                t.next = node;
                return t;// 結束這個方法的唯一出口 添加尾節點成功
            }
        }
    }
}

至此,CountDownLatch.await()阻塞當前線程的基本功能已經梳理出來了,CountDownLatch.countDown()計數器減一功能以及CountDownLatch示例和它的優缺點將留在下部分梳理。

然後,限於篇幅更多的compareAndSetHead()和compareAndSetTail()這些末節方法未詳細列出,希望讀者能自行查看api了解。

最後,由於筆者水平有限,難免有不足之處,有不對之處,請不吝惜指教。謝謝!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved