在 Java 編程的早期階段,位於 Oswego 市的紐約州立大學(SUNY) 的一位教授決定創建一個簡單的庫,以幫助開發人員構建可以更好地處理多線程情況的應用程序。這並不是說用現有的庫就不能實現,但是就像有了標准網絡庫一樣,用經過調試的、可信任的庫更容易自己處理多線程。在 Addision-Wesley 的一本相關書籍的幫助下,這個庫變得越來越流行了。最終,作者 Doug Lea 決定設法讓它成為 Java 平台的標准部分 —— JSR-166。這個庫最後變成了 Tiger 版本的 java.util.concurrent 包。在這篇新的 馴服 Tiger 技巧中,我們將探討 Collection Framework 中新的 Queue 接口、這個接口的非並發和並發實現、並發 Map 實現和專用於讀操作大大超過寫操作這種情況的並發 List 和 Set 實現。
介紹 Queue 接口
java.util 包為集合提供了一個新的基本接口: java.util.Queue 。雖然肯定可以在相對應的兩端進行添加和刪除而將 java.util.List 作為隊列對待,但是這個新的 Queue 接口提供了支持添加、刪除和檢查集合的更多方法,如下所示:
public boolean offer(Object element)
public Object remove()
public Object poll()
public Object element()
public Object peek()
基本上,一個隊列就是一個先入先出(FIFO)的數據結構。一些隊列有大小限制,因此如果想在一個滿的隊列中加入一個新項,多出的項就會被拒絕。這時新的 offer 方法就可以起作用了。它不是對調用 add() 方法拋出一個 unchecked 異常,而只是得到由 offer() 返回的 false。 remove() 和 poll() 方法都是從隊列中刪除第一個元素(head)。 remove() 的行為與 Collection 接口的版本相似,但是新的 poll() 方法在用空集合調用時不是拋出異常,只是返回 null。因此新的方法更適合容易出現異常條件的情況。後兩個方法 element() 和 peek() 用於在隊列的頭部查詢元素。與 remove() 方法類似,在隊列為空時, element() 拋出一個異常,而 peek() 返回 null。
使用基本隊列
在 Tiger 中有兩組 Queue 實現:實現了新 BlockingQueue 接口的和沒有實現這個接口的。我將首先分析那些沒有實現的。
在最簡單的情況下,原來有的 java.util.LinkedList 實現已經改造成不僅實現 java.util.List 接口,而且還實現 java.util.Queue 接口。可以將集合看成這兩者中的任何一種。清單 1 顯示將 LinkedList 作為 Queue 使用的一種方法:
清單 1. 使用 Queue 實現
Queue queue = new LinkedList();
queue.offer("One");
queue.offer("Two");
queue.offer("Three");
queue.offer("Four");
// Head of queue should be One
System.out.println("Head of queue is: " + queue.poll());
再復雜一點的是新的 java.util.AbstractQueue 類。這個類的工作方式類似於 java.util.AbstractList 和 java.util.AbstractSet 類。在創建自定義集合時,不用自己實現整個接口,只是繼承抽象實現並填入細節。使用 AbstractQueue 時,必須為方法 offer() 、 poll() 和 peek() 提供實現。像 add() 和 addAll() 這樣的方法修改為使用 offer() ,而 clear() 和 remove() 使用 poll() 。最後, element() 使用 peek() 。當然可以在子類中提供這些方法的優化實現,但是不是必須這麼做。而且,不必創建自己的子類,可以使用幾個內置的實現, 其中兩個是不阻塞隊列: PriorityQueue 和 ConcurrentLinkedQueue 。
PriorityQueue 和 ConcurrentLinkedQueue 類在 Collection Framework 中加入兩個具體集合實現。 PriorityQueue 類實質上維護了一個有序列表。加入到 Queue 中的元素根據它們的天然排序(通過其 java.util.Comparable 實現)或者根據傳遞給構造函數的 java.util.Comparator 實現來定位。將清單 2 中的 LinkedList 改變為 PriorityQueue 將會打印出 Four 而不是 One,因為按字母排列 —— 字符串的天然順序 —— Four 是第一個。 ConcurrentLinkedQueue 是基於鏈接節點的、線程安全的隊列。並發訪問不需要同步。因為它在隊列的尾部添加元素並從頭部刪除它們,所以只要不需要知道隊列的大小, ConcurrentLinkedQueue 對公共集合的共享訪問就可以工作得很好。收集關於隊列大小的信息會很慢,需要遍歷隊列。
使用阻塞隊列
新的 java.util.concurrent 包在 Collection Framework 中可用的具體集合類中加入了 BlockingQueue 接口和五個阻塞隊列類。假如不熟悉阻塞隊列概念,它實質上就是一種帶有一點扭曲的 FIFO 數據結構。不是立即從隊列中添加或者刪除元素,線程執行操作阻塞,直到有空間或者元素可用。 BlockingQueue 接口的 Javadoc 給出了阻塞隊列的基本用法,如清單 2 所示。生產者中的 put() 操作會在沒有空間可用時阻塞,而消費者的 take() 操作會在隊列中沒有任何東西時阻塞。
清單 2. 使用 BlockingQueue
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while(true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
五個隊列所提供的各有不同:
ArrayBlockingQueue :一個由數組支持的有界隊列。
LinkedBlockingQueue :一個由鏈接節點支持的可選有界隊列。
PriorityBlockingQueue :一個由優先級堆支持的無界優先級隊列。
DelayQueue :一個由優先級堆支持的、基於時間的調度隊列。
SynchronousQueue :一個利用 BlockingQueue 接口的簡單聚集(rendezvous)機制。
前兩個類 ArrayBlockingQueue 和 LinkedBlockingQueue 幾乎相同,只是在後備存儲器方面有所不同, LinkedBlockingQueue 並不總是有容量界限。無大小界限的 LinkedBlockingQueue 類在添加元素時永遠不會有阻塞隊列的等待(至少在其中有 Integer.MAX_VALUE 元素之前不會)。
PriorityBlockingQueue 是具有無界限容量的隊列,它利用所包含元素的 Comparable 排序順序來以邏輯順序維護元素。可以將它看作 TreeSet 的可能替代物。例如,在隊列中加入字符串 One、Two、Three 和 Four 會導致 Four 被第一個取出來。對於沒有天然順序的元素,可以為構造函數提供一個 Comparator 。不過對 PriorityBlockingQueue 有一個技巧。從 iterator() 返回的 Iterator 實例不需要以優先級順序返回元素。如果必須以優先級順序遍歷所有元素,那麼讓它們都通過 toArray() 方法並自己對它們排序,像 Arrays.sort(pq.toArray()) 。
新的 DelayQueue 實現可能是其中最有意思(也是最復雜)的一個。加入到隊列中的元素必須實現新的 Delayed 接口(只有一個方法 —— long getDelay(java.util.concurrent.TimeUnit unit) )。因為隊列的大小沒有界限,使得添加可以立即返回,但是在延遲時間過去之前,不能從隊列中取出元素。如果多個元素完成了延遲,那麼最早失效/失效時間最長的元素將第一個取出。實際上沒有聽上去這樣復雜。清單 3 演示了這種新的阻塞隊列集合的使用:
清單 3. 使用 DelayQueue 實現
import java.util.*;
import java.util.concurrent.*;
public class Delay {
/**
* Delayed implementation that actually delays
*/
static class NanoDelay implements Delayed {
long trigger;
NanoDelay(long i) {
trigger = System.nanoTime() + i;
}
public int compareTo(Object y) {
long i = trigger;
long j = ((NanoDelay)y).trigger;
if (i < j) return -1;
if (i > j) return 1;
return 0;
}
public boolean equals(Object other) {
return ((NanoDelay)other).trigger == trigger;
}
public boolean equals(NanoDelay other) {
return ((NanoDelay)other).trigger == trigger;
}
public long getDelay(TimeUnit unit) {
long n = trigger - System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime() {
return trigger;
}
public String toString() {
return String.valueOf(trigger);
}
}
public static void main(String args[]) throws InterruptedException {
Random random = new Random();
DelayQueue queue = new DelayQueue();
for (int i=0; i < 5; i++) {
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last = 0;
for (int i=0; i < 5; i++) {
NanoDelay delay = (NanoDelay)(queue.take());
long tt = delay.getTriggerTime();
System.out.println("Trigger time: " + tt);
if (i != 0) {
System.out.println("Delta: " + (tt - last));
}
last = tt;
}
}
}
這個例子首先是一個內部類 NanoDelay ,它實質上將暫停給定的任意納秒(nanosecond)數,這裡利用了 System 的新 nanoTime() 方法。然後 main() 方法只是將 NanoDelay 對象放到隊列中並再次將它們取出來。如果希望隊列項做一些其他事情,就需要在 Delayed 對象的實現中加入方法,並在從隊列中取出後調用這個新方法。(請隨意擴展 NanoDelay 以試驗加入其他方法做一些有趣的事情。)顯示從隊列中取出元素的兩次調用之間的時間差。如果時間差是負數,可以視為一個錯誤,因為永遠不會在延遲時間結束後,在一個更早的觸發時間從隊列中取得項。
SynchronousQueue 類是最簡單的。它沒有內部容量。它就像線程之間的手遞手機制。在隊列中加入一個元素的生產者會等待另一個線程的消費者。當這個消費者出現時,這個元素就直接在消費者和生產者之間傳遞,永遠不會加入到阻塞隊列中。
使用 ConcurrentMap 實現
新的 java.util.concurrent.ConcurrentMap 接口和 ConcurrentHashMap 實現只能在鍵不存在時將元素加入到 map 中,只有在鍵存在並映射到特定值時才能從 map 中刪除一個元素。
有一個新的 putIfAbsent() 方法用於在 map 中進行添加。這個方法以要添加到 ConcurrentMap 實現中的鍵的值為參數,就像普通的 put() 方法,但是只有在 map 不包含這個鍵時,才能將鍵加入到 map 中。如果 map 已經包含這個鍵,那麼這個鍵的現有值就會保留。 putIfAbsent() 方法是原子的。如果不調用這個原子操作,就需要從適當的同步塊中調用清單 4 中的代碼:
清單 4. 等價的 putIfAbsent() 代碼
if (!map.containsKey(key)) {
return map.put(key, value);
} else {
return map.get(key);
}
像 putIfAbsent() 方法一樣,重載後的 remove() 方法有兩個參數 —— 鍵和值。在調用時,只有當鍵映射到指定的值時才從 map 中刪除這個鍵。如果不匹配,那麼就不刪除這個鍵,並返回 false。如果值匹配鍵的當前映射內容,那麼就刪除這個鍵。清單 5 顯示了這種操作的等價源代碼:
清單 5. 等價的 remove() 代碼
if (map.get(key).equals(value)) {
map.remove(key);
return true;
} else {
return false;
}
使用 CopyOnWriteArrayList 和 CopyOnWriteArraySet
在 Doug Lea 的 Concurrent Programming in Java一書的第 2 章第 2.4.4 節(請參閱 參考資料)中,對 copy-on-write 模式作了最好的描述。實質上,這個模式聲明了,為了維護對象的一致性快照,要依靠不可變性(immutability)來消除在協調讀取不同的但是相關的屬性時需要的同步。對於集合,這意味著如果有大量的讀(即 get() ) 和迭代,不必同步操作以照顧偶爾的寫(即 add() )調用。對於新的 CopyOnWriteArrayList 和 CopyOnWriteArraySet 類,所有可變的(mutable)操作都首先取得後台數組的副本,對副本進行更改,然後替換副本。這種做法保證了在遍歷自身更改的集合時,永遠不會拋出 ConcurrentModificationException 。遍歷集合會用原來的集合完成,而在以後的操作中使用更新後的集合。
這些新的集合, CopyOnWriteArrayList 和 CopyOnWriteArraySet ,最適合於讀操作通常大大超過寫操作的情況。一個最常提到的例子是使用監聽器列表。已經說過,Swing 組件還沒有改為使用新的集合。相反,它們繼續使用 javax.swing.event.EventListenerList 來維護它們的監聽器列表。
如清單 6 所示,集合的使用與它們的非 copy-on-write 替代物完全一樣。只是創建集合並在其中加入或者刪除元素。即使對象加入到了集合中,原來的 Iterator 也可以進行,繼續遍歷原來集合中的項。
清單 6. 展示一個 copy-on-write 集合
import java.util.*;
import java.util.concurrent.*;
public class CopyOnWrite {
public static void main(String args[]) {
List list1 = new CopyOnWriteArrayList(Arrays.asList(args));
List list2 = new ArrayList(Arrays.asList(args));
Iterator itor1 = list1.iterator();
Iterator itor2 = list2.iterator();
list1.add("New");
list2.add("New");
try {
printAll(itor1);
} catch (ConcurrentModificationException e) {
System.err.println("Shouldn't get here");
}
try {
printAll(itor2);
} catch (ConcurrentModificationException e) {
System.err.println("Will get here.");
}
}
private static void printAll(Iterator itor) {
while (itor.hasNext()) {
System.out.println(itor.next());
}
}
}
這個示例程序用命令行參數創建 CopyOnWriteArrayList 和 ArrayList 這兩個實例。在得到每一個實例的 Iterator 後,分別在其中加入一個元素。當 ArrayList 迭代因一個 ConcurrentModificationException 問題而立即停止時, CopyOnWriteArrayList 迭代可以繼續,不會拋出異常,因為原來的集合是在得到 iterator 之後改變的。如果這種行為(比如通知原來一組事件監聽器中的所有元素)是您需要的,那麼最好使用 copy-on-write 集合。如果不使用的話,就還用原來的,並保證在出現異常時對它進行處理。
結束語
在 J2SE 平台的 Tiger 版中有許多重要的增加。除了語言級別的改變,如一般性支持,這個庫也許是最重要的增加了,因為它會被最廣泛的用戶使用。不要忽視加入到平台中的其他包,像 Java Management Extensions (JMX),但是大多數其他重要的庫增強只針對范圍很窄的開發人員。但是這個庫不是。除了用於鎖定和原子操作的其他並發實用程序,這些類也會經常使用。盡早學習它們並利用它們所提供的功能。