這篇文章將使用經典的生產者消費者的例子來進一步鞏固java多線程通信,介紹使用阻塞隊列來簡化程序
下面是一個經典的生產者消費者的例子:
假設使用緩沖區存儲整數,緩沖區的大小是受限制的。緩沖區提供write(int)方法將一個整數添加到緩沖區,還體統read()方法從緩沖區中讀取並刪除一個整數。為了同步操作,使用具有兩個條件的鎖,notEmpty(緩沖區非空)和notFull(緩沖區未滿)。當任務相緩沖區添加一個int時,如果緩沖區是滿的,那麼任務將等待notFull狀態,當任務從緩沖區總刪除一個int時,如果緩沖區是空的,那麼任務將等待notEmpty狀態。
import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerProducer { private static Buffer buffer = new Buffer(); public static void main(String[] args) { // 創建線程池 ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Producertask()); executor.execute(new Consumertask()); executor.shutdown(); } private static class Producertask implements Runnable { @Override public void run() { try { int i = 1; while (true) { System.out.println("Producer writes " + i); buffer.write(i++); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumertask implements Runnable { public void run() { try { while (true) { System.out.println("\t\t\tConsumer reads " + buffer.read()); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Buffer { private static final int CAPACITY = 1; // buffer size LinkedList<Integer> queue = new LinkedList<Integer>(); // 創建鎖 private static Lock lock = new ReentrantLock(); // 創建兩個條件 private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value) { lock.lock(); // 請求鎖 try { while (queue.size() == CAPACITY) { System.out.println("Wait for notFull condition"); notFull.await(); } queue.offer(value); notEmpty.signal(); // notEmpty條件信號 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); // 釋放鎖 } } @SuppressWarnings("finally") public int read() { int value = 0; lock.lock(); try { while (queue.isEmpty()) { System.out.println("\t\t\tWait for notEmpty condition"); notEmpty.await(); } value = queue.remove(); notFull.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); return value; } } } }
阻塞列隊
阻塞列隊在試圖想一個滿列隊添加元素或這從空列隊刪除元素時會導致線程阻塞。BlockQueue接口擴展java.util.queue,並且提供同步的put和take方法想列隊頭部添加元素,以及從列隊尾刪除元素
java支持三個具體的阻塞列隊ArrayBlockingQueue、LinkedblockingQueue 和 PriorityBlockingQueue ,他們都在java.util.concurrent包中。
ArrayBlockingQueue
一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。
這是一個典型的“有界緩存區”,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦創建了這樣的緩存區,就不能再增加其容量。試圖向已滿隊列中放入元素會導致操作受阻塞;試圖從空隊列中提取元素將導致類似阻塞。
此類支持對等待的生產者線程和使用者線程進行排序的可選公平策略。默認情況下,不保證是這種排序。然而,通過將公平性 (fairness) 設置為 true 而構造的隊列允許按照 FIFO 順序訪問線程。公平性通常會降低吞吐量,但也減少了可變性和避免了“不平衡性”。
LinkedBlockingQueue
一個基於已鏈接節點的、范圍任意的 blocking queue。此隊列按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。
可選的容量范圍構造方法參數作為防止隊列過度擴展的一種方法。如果未指定容量,則它等於Integer.MAX_VALUE
。除非插入節點會使隊列超出容量,否則每次插入後會動態地創建鏈接節點。
PriorityBlockingQueue
一個無界阻塞隊列,它使用與類 PriorityQueue
相同的順序規則,並且提供了阻塞獲取操作。雖然此隊列邏輯上是無界的,但是資源被耗盡時試圖執行 add 操作也將失敗(導致 OutOfMemoryError)。此類不允許使用 null 元素。依賴自然順序的優先級隊列也不允許插入不可比較的對象(這樣做會導致拋出 ClassCastException)。
使用ArrayBlockingQueue簡化後的代碼如下:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerProducerUsingBlockingQueue { private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2); public static void main(String[] args) { // 創建線程池 ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Producertask()); executor.execute(new Consumertask()); executor.shutdown(); } private static class Producertask implements Runnable { @Override public void run() { try { int i = 1; while (true) { System.out.println("Producer writes " + i); buffer.put(i++); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumertask implements Runnable { public void run() { try { while (true) { System.out.println("\t\t\tConsumer reads " + buffer.take()); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
可以看到,代碼減少了一半,主要是因為ArrayBlockingQueue中已經實現了同步,所以無需手動編碼