ArrayBlockingQueue:基於數組實現的一個阻塞隊列,在創建ArrayBlockingQueue對象時必須制定容量大小。並且可以指定公平性與非公平性,默認情況下為非公平的,即不保證等待時間最長的隊列最優先能夠訪問隊列。
LinkedBlockingQueue:基於鏈表實現的一個阻塞隊列,在創建LinkedBlockingQueue對象時如果不指定容量大小,則默認大小為Integer.MAX_VALUE,每次插入後都將動態地創建鏈接節點。
PriorityBlockingQueue:以上2種隊列都是先進先出隊列,而PriorityBlockingQueue卻不是,它會按照元素的優先級對元素進行排序,按照優先級順序出隊,每次出隊的元素都是優先級最高的元素,依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序。注意,此阻塞隊列為無界阻塞隊列,即容量沒有上限(通過源碼就可以知道,它沒有容器滿的信號標志),前面2種都是有界隊列。
DelayQueue:基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。
SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。
BlockingQueue主要方法:
拋出異常 特殊值 阻塞 超時 插入add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
檢查
element()
peek()
不可用
不可用
對於非阻塞隊列,一般情況下建議使用offer、poll和peek三個方法,不建議使用add和remove方法。因為使用offer、poll和peek三個方法可以通過返回值判斷操作成功與否,而使用add和remove方法卻不能達到這樣的效果。注意,非阻塞隊列中的方法都沒有進行同步措施。
以ArrayBlockingQueue為例,查看其源代碼,其中主要包含以下對象:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** 數組對象,用於放置對象 */ final Object[] items; /** put, offer, or add方法放入數組的索引 */ int putIndex; /** take, poll, peek or remove方法取出數據的數組索引 */ int takeIndex; /** queue隊列的總數 */ int count; /**可重入鎖,控制並發*/ final ReentrantLock lock; /** 非空信號量,可以取數*/ private final Condition notEmpty; /** 非滿信號量,可以放數 */ private final Condition notFull; }
下面主要介紹下put()和take()方法,來觀察其同步的實現:
1 public void put(E e) throws InterruptedException { 2 checkNotNull(e); 3 final ReentrantLock lock = this.lock; 4 lock.lockInterruptibly(); 5 try { 6 while (count == items.length) 7 notFull.await(); 8 insert(e); 9 } finally { 10 lock.unlock(); 11 } 12 }
1 public E take() throws InterruptedException { 2 final ReentrantLock lock = this.lock; 3 lock.lockInterruptibly(); 4 try { 5 while (count == 0) 6 notEmpty.await(); 7 return extract(); 8 } finally { 9 lock.unlock(); 10 } 11 }
大家應該明白了阻塞隊列的實現原理,事實它和我們用Object.wait()、Object.notify()和非阻塞隊列實現生產者-消費者的思路類似,只不過它把這些工作一起集成到了阻塞隊列中實現。並且在前面Condition中我們也模擬實現了一個阻塞隊列,實現與其大同小異。
1:啟動兩個線程實現互斥等待:
1 public class BlockingQueueTest { 2 public static void main(String[] args) { 3 final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); 4 for (int i = 0; i < 2; i++) { 5 new Thread(new Runnable() { 6 @Override 7 public void run() { 8 while (true) { 9 System.out.println("Thread "+Thread.currentThread().getName()+"正在准備放入數據"); 10 try { 11 //模擬線程的放數速度 12 Thread.sleep(new Random().nextInt(1000)); 13 } catch (InterruptedException e) { 14 // TODO Auto-generated catch block 15 e.printStackTrace(); 16 } 17 try { 18 queue.put(1); 19 } catch (InterruptedException e) { 20 // TODO Auto-generated catch block 21 e.printStackTrace(); 22 } 23 System.out.println("Thread "+Thread.currentThread().getName()+"放入數據,此時隊列中的數據為:"+queue.size()); 24 } 25 } 26 }).start(); 27 new Thread(new Runnable() { 28 @Override 29 public void run() { 30 while (true) { 31 System.out.println("Thread "+Thread.currentThread().getName()+"正在取得數據"); 32 try { 33 //模擬線程的去數速度 34 Thread.sleep(100); 35 } catch (InterruptedException e) { 36 // TODO Auto-generated catch block 37 e.printStackTrace(); 38 } 39 try { 40 queue.take(); 41 } catch (InterruptedException e) { 42 // TODO Auto-generated catch block 43 e.printStackTrace(); 44 } 45 System.out.println("Thread "+Thread.currentThread().getName()+"取得數據,此時隊列中的數據為:"+queue.size()); 46 } 47 } 48 }).start(); 49 } 50 51 } 52 }
2:前面介紹傳統線程通信中,主線程和子線程交替運行,現在以阻塞隊列來實現。
1 public class BlockingQueueCommunication { 2 public static void main(String[] args) { 3 final Business business = new Business(); 4 new Thread(new Runnable() { 5 6 @Override 7 public void run() { 8 // TODO Auto-generated method stub 9 for (int i = 0; i < 50; i++) { 10 try { 11 business.sub(i); 12 } catch (InterruptedException e) { 13 // TODO Auto-generated catch block 14 e.printStackTrace(); 15 } 16 } 17 } 18 }).start(); 19 for (int i = 0; i < 50; i++) { 20 try { 21 business.main(i); 22 } catch (InterruptedException e) { 23 // TODO Auto-generated catch block 24 e.printStackTrace(); 25 } 26 } 27 } 28 static class Business{ 29 BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); 30 BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); 31 { 32 try { 33 queue2.put(1);//保證queue2阻塞 34 } catch (InterruptedException e) { 35 // TODO Auto-generated catch block 36 e.printStackTrace(); 37 } 38 } 39 40 public void main(int i) throws InterruptedException{ 41 queue1.put(1);//阻塞queue1 42 for (int j = 0; j < 100; j++) { 43 System.out.println("main thread is looping of "+j +" in " + i); 44 } 45 queue2.take();//喚醒queue2 46 } 47 public void sub(int i) throws InterruptedException{ 48 queue2.put(1);//阻塞queue2 49 for (int j = 0; j < 10; j++) { 50 System.out.println("sub thread is looping of "+j +" in " + i); 51 } 52 queue1.take();//喚醒queue1 53 } 54 } 55 }
BlockingQueue實現了線程同步,不可在方法中再次加入同步限制,否則會出現死鎖。
3:在API中有一個阻塞對象實現生產者和消費者的例子
1 class Producer implements Runnable { 2 private final BlockingQueue queue; 3 Producer(BlockingQueue q) { queue = q; } 4 public void run() { 5 try { 6 while(true) { queue.put(produce()); } 7 } catch (InterruptedException ex) { ... handle ...} 8 } 9 Object produce() { ... } 10 } 11 12 class Consumer implements Runnable { 13 private final BlockingQueue queue; 14 Consumer(BlockingQueue q) { queue = q; } 15 public void run() { 16 try { 17 while(true) { consume(queue.take()); } 18 } catch (InterruptedException ex) { ... handle ...} 19 } 20 void consume(Object x) { ... } 21 } 22 23 class Setup { 24 void main() { 25 BlockingQueue q = new SomeQueueImplementation(); 26 Producer p = new Producer(q); 27 Consumer c1 = new Consumer(q); 28 Consumer c2 = new Consumer(q); 29 new Thread(p).start(); 30 new Thread(c1).start(); 31 new Thread(c2).start(); 32 } 33 }
使用阻塞隊列代碼要簡單得多,不需要再單獨考慮同步和線程間通信的問題。
在並發編程中,一般推薦使用阻塞隊列,這樣實現可以盡量地避免程序出現意外的錯誤。
阻塞隊列使用最經典的場景就是socket客戶端數據的讀取和解析,讀取數據的線程不斷將數據放入隊列,然後解析線程不斷從隊列取數據解析。還有其他類似的場景,只要符合生產者-消費者模型的都可以使用阻塞隊列。
參考資料:http://www.cnblogs.com/dolphin0520/p/3932906.html
javaAPI