線程之間的同步,除了互斥(前面介紹的互斥鎖)之外,還存在協作關系,下面我們就介紹一下java線程間常見的一些協作方式。
正如每個Java對象都可以作為一個內置鎖,每個對象也可以作為一個條件隊列,稱為內置條件隊列,Object.wait()、notify()/notifyAll()構成了內置條件隊列的API。
需要注意的是,調用任何對象X的內置條件隊列的API都必須要先獲得該對象X的內置鎖。
Wait()
u 調用時自動釋放當前鎖,請求OS將自己掛起
u 內置條件隊列上的條件發生後被喚醒
u 被喚醒後與其他線程競爭重新獲得鎖
notify()
u 通知喚醒當前獲得鎖的對象的內置條件隊列上的任意一個等待的線程
u 發出通知後盡快釋放當前獲得的鎖確保等待的線程能夠獲取
notifyAll()
u 通知喚醒當前獲得鎖的對象的內置條件隊列上的所有等待的線程
u 發出通知後盡快釋放當前獲得的鎖確保等待的線程能夠獲取
u 只有一個被喚醒的線程能夠獲得鎖,先競爭到鎖的線程執行完退出Synchronized塊之後其他被喚醒的線程重新競爭直到所有被喚醒的線程執行完畢
必須運行在同步控制塊中: wait,notify,notifyAll作為常用的任務間協作原語,是Object類的一部分,而不是Thread的一部分,所以可以把他們放進任何同步控制方法中。
實際上,只能在同步控制方法/同步控制塊中調用wait,notify,notifyAll這幾個方法,如果在非同步控制方法裡面調用了這幾個方法,可以編譯通過,但是運行的時候會獲得IllegalMonitorStateException異常。
一般來說,wait和notify放在synchronozed(object)同步塊中,並由這個object來調用。如果是synchronized(this),那麼就直接調用。具體如下:
(1)在某個指定對象lockObj上調用:
synchronized(lockObj) //獲取lockObj對象鎖 { try { //釋放lockObj對象鎖,阻塞等待在內置條件隊列上 lockObj.wait(); }catch(InterruptedException e) { e.printStackTrace(); return; } } synchronized(lockObj) { //喚醒一個等待在lockObj對象的內置條件隊列上的線程 lockObj.notify(); }
(2)也可以在this上調用:
synchronized(this)//獲取當前對象內置鎖 { try { //釋放當前對象鎖,阻塞在內置條件隊列上 wait(); }catch(InterruptedException e) { e.printStackTrace(); return; } } synchronized(this) { //喚醒當前對象內置條件隊列上的一個線程 notify(); }
wait常與while(條件判斷) 配合使用:一般來說,必須用一個檢查感興趣的條件的while循環來包圍wait,因為如果有多個任務等待同一個鎖,第一個喚醒的任務可能先執行改變while條件判斷中的狀態,使得當前任務不得不再次被掛起,直到感興趣的條件發生變化為止。
synchronized(this){ while(waxon == true)wait(); }
這樣可以避免“notify通知遺漏問題”。
//線程A synchronized (proceedLock) { proceedLock.wait(); } //線程B synchronized (proceedLock) { proceedLock.notifyAll(); }
本來設計線程B的職責就是再某個時刻通知線程A將其喚醒,但是如果線程B執行太早,在線程A還沒開始動的時候就已經執行完成,那麼線程A就會一直wait下去,等不到線程B來將其喚醒。這就是所謂的通知遺漏問題。
如果線程A在wait的時候配合變量判斷就可以解決這個問題。
//線程A: synchronized (proceedLock) { //while循環判斷,這裡不用if的原因是為了防止早期通知 while ( okToProceed == false ) { proceedLock.wait(); } } //線程B: synchronized (proceedLock) { //通知之前,將其設置為true,這樣即使出現通知遺漏的情況 //也不會使線程在wait出阻塞 okToProceed= true; proceedLock.notifyAll(); }
變量okToProceed在初始時設置為false,即讓線程A默認阻塞,等待線程B將其喚醒。如果線程B仍然在線程A還未動之前就已經結束了,但是已經將線程B等待的條件設置為true了,所以線程A是不會wait休眠的。
這樣就避免了通知遺漏問題。
前面已經說了每個Java對象都有一個內置的條件隊列,但是它又一個很明顯的缺陷:每個內置鎖只能有一個關聯的內置條件隊列!!!
可以在顯式鎖ReentrantLock上調用Lock.newCondition()方法獲得一個顯示的Condition條件隊列,Condition比內置條件隊列提供了更加豐富的功能:在每個鎖上可以創建多個顯示條件隊列,條件等待可以選擇可中斷或者不可中斷,等待也可以設置時限,此外還提供公平的和非公平的隊列操作。
在顯示條件隊列Condition中,與內置條件隊列的wait、notify、notifyAll相對應的方法分別是await、signal、signalAll。
下面用一個例子說明:例子給出了有界緩存的實現,在同一個顯式鎖上創建了兩個顯示條件隊列,一個表明緩存不滿的條件,一個表明緩存不空的條件。
public classConditionBoundBuffer{ protected final Lock lock = new ReentrantLock(); //緩存非滿的條件隊列 private final Condition notFullCond = lock.newCondition(); //緩存非空的條件隊列 private final Condition notEmptyCond = lock.newCondition(); @SuppressWarnings("unchecked") private final T[] items = (T[])new Object[100]; private int tail,head,count; public void put(T x) throws Exception { lock.lock(); try { //當緩存滿的時候,阻塞等待在緩存非滿的條件隊列上,並釋放鎖 while(count == items.length) notFullCond.await(); items[tail] = x; if(++tail == items.length) tail = 0; ++count; //喚醒等待在緩存非空條件隊列上的一個線程,並釋放鎖 notEmptyCond.signal(); }finally { lock.unlock(); } } public T take() throws InterruptedException { lock.lock(); try { //當緩存為空時,阻塞等待在緩存非空的條件隊列上,並釋放鎖 while(count == 0) notEmptyCond.await(); Tx = items[head]; items[head] = null; if(++head == items.length) head = 0; --count; //喚醒等待在緩存非滿條件隊列上的一個線程,並釋放鎖 notFullCond.signal(); return x; }finally { lock.unlock(); } } }
java.util.concurrent包中含有一些同步工具類,提供一些實用的線程間同步功能。
可阻塞隊列BlockingQueue拓展了Queue,增加了可阻塞的插入和獲取等操作
public interface BlockingQueue原理::BlockingQueue是線程安全容器,並且具備阻塞特性,其內部通過ReentrantLock實現線程安全,通過Condition實現阻塞和喚醒。extends Queue { //放入元素,若有空間容納則返回true,否則拋出IllegalStateException異常 boolean add(E e); //放入元素,若有空間容納則返回true,否則返回false boolean offer(E e); //放入元素,若有空間容納則返回true,否則阻塞等待 void put(E e) throws InterruptedException; //檢索並移除隊首元素,若不能立刻取到則阻塞等待 E take() throws InterruptedException; //檢索並移除隊首元素,若不能立刻取到則等待,超時後返回null Epoll(long timeout, TimeUnit unit) throws InterruptedException; }
應用:通過put和take方法,很容易實現線程間協同,比如典型的生產者-消費者模式。
可中斷:和Thread.sleep()、Object.wait()、Thread.join()等阻塞接口一樣,BlockingQueue.put()/take()可響應中斷
下面是幾個BlockingQueue接口的實現類:
(1)ArrayBlockingQueue:基於數組的阻塞隊列實現,大小固定,其構造函數必須指定int參數來指明隊列大小,內部元素以FIFO(先進先出)順序存儲,常用於實現有界緩存。
(2)LinkedBlockingQueue:基於鏈表的阻塞隊列實現,大小不固定,若其構造函數帶一個規定大小的參數,則生成的BlockingQueue有大小限制,若不帶大小參數,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定,內部元素以FIFO(先入先出)順序存儲。
(3)PriorityBlockingQueue:基於數組的阻塞隊列實現,但其所含對象的排序不是FIFO,而是依據對象的自然排序順序或者是構造函數的Comparator決定的順序。
(4)SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。
讓相關線程在某一個點上等待,直到某一條件發生時,這些等待的線程才會繼續執行,即所有線程阻塞等待閉鎖的計數值減為0
打個比喻,閉鎖相當於一扇門,這扇門要按N次(N是閉鎖的初始計數值)才能打開,但是按門的線程不關心有多少線程在門外等待,只有門打開了,等待在門外的所有線程才能進去。
步驟1:初始化閉鎖(設定門要按幾次才能開)
CountDownLatch latch = new CountDownLatch(N);
步驟2:讓線程等待該閉鎖(在門外等待)
latch.await();
當等待的線程檢測到當前閉鎖計數器已經減為0(門打開),則繼續執行。
步驟3:閉鎖計數器減1(按1次門)
latch.countDown();
應用:一個線程等待N個線程全部完成任務
比如主線程需要所有圖片資源都准備好之後才能使用,所以開啟N個線程為其下載圖片資源,自己則初始化初始值為N的閉鎖並調用await()等待在這個閉鎖上,每個線程下載完圖片資源之後調用countDown()將閉鎖減一,最後一個下載線程減一之後閉鎖計數器變為0,此時等待閉鎖的主線程才開始繼續執行,使用已下載的圖片資源。
類似地可以實現N個線程等1個線程開門,1個線程等待1個線程開門等。
信號量用來控制同步訪問某個特定資源的線程的數量。
信號量的數目就代表資源數目,當申請一個信號量之後,表示資源數目減1,如果某個線程要申請信號量,但是該信號量數目已經為0了,改線程將會阻塞等待信號量的釋放。
步驟1:初始化信號量
Semaphore sem = new Semaphore(N); //N代表資源數目
步驟2:申請占用一個信號量
sem.acquire(); //信號量數值減1,如果信號量計數值已經為0,將阻塞等待
步驟3:釋放一個信號量
sem.release(); //信號量數值加1,標識資源使用完成,阻塞等待的線程被喚醒
應用:數據庫連接池管理
將可用的和被占用的數據庫連接分別管理在兩個集合中,獲取數據庫連接的函數會從可用連接集合獲取一個連接,並將連接轉移到另一個集合,釋放數據庫連接的函數將會把用完的連接放入可用連接集合。
我們不想在沒有數據庫連接可用時獲取連接的函數直接返回失敗,而是想阻塞等待。所以在獲取連接的函數中加入申請信號量的調用,在釋放數據庫連接的函數中加入釋放信號量的調用就可以了(注意數據庫連接池管理更好的方式可能是BlockingQueue,因為信號量初始值的數目是固定的,在這裡需要和數據庫連接池大小相同)。
0-1信號量:又稱為互斥信號量,有且僅有一個線程能夠獲取資源的獨占使用,或者函數的獨占訪問。
多個線程單獨執行,當所有線程都達到柵欄位置之後,才調度指定任務執行。
柵欄和閉鎖很像,區別在於:閉鎖是等待事件(閉鎖計數值變為0)發生,而柵欄是等待其他所有線程均達到柵欄位置。
步驟1:初始化柵欄
CyclicBarrier Barrier = newCyclicBarrier(count, runnableTask);
指定需要有count個線程到達柵欄點之後才能沖破柵欄,並調用runnableTask任務執行。
步驟2:線程中設置柵欄點
barrier.wait();
當設置了柵欄的所有線程都達到了這個柵欄位置之後,才調用runnableTask任務執行。
注意:從CyclicBarrier的名稱中可以看出,柵欄具備可循環特性,即所有線程沖破柵欄之後,如果該線程會循環繼續執行,那麼下次改柵欄仍然有效。