java多線程環境中應用HashMap,主要有以下幾種選擇:使用線程安全的java.util.Hashtable作為替代使用java.util.Collections.synchronizedMap方法,將已有的HashMap對象包裝為線程安全的。使用java.util.concurrent.ConcurrentHashMap類作為替代,它具有非常好的性能。
而以上幾種方法在實現的具體細節上,都或多或少地用到了互斥鎖。互斥鎖會造成線程阻塞,降低運行效率,並有可能產生死鎖、優先級翻轉等一系列問題。
CAS(Compare And Swap)是一種底層硬件提供的功能,它可以將判斷並更改一個值的操作原子化。
Java中的原子操作
在java.util.concurrent.atomic包中,Java為我們提供了很多方便的原子類型,它們底層完全基於CAS操作。
例如我們希望實現一個全局公用的計數器,那麼可以:
代碼如下:
privateAtomicInteger counter =newAtomicInteger(3);
publicvoidaddCounter() {
for(;;) {
intoldValue = counter.get();
intnewValue = oldValue +1;
if(counter.compareAndSet(oldValue, newValue))
return;
}
}
其中,compareAndSet方法會檢查counter現有的值是否為oldValue,如果是,則將其設置為新值newValue,操作成功並返回true;否則操作失敗並返回false。
當計算counter新值時,若其他線程將counter的值改變,compareAndSwap就會失敗。此時我們只需在外面加一層循環,不斷嘗試這個過程,那麼最終一定會成功將counter值+1。(其實AtomicInteger已經為常用的+1/-1操作定義了 incrementAndGet與decrementAndGet方法,以後我們只需簡單調用它即可)
除了AtomicInteger外,java.util.concurrent.atomic包還提供了AtomicReference和AtomicReferenceArray類型,它們分別代表原子性的引用和原子性的引用數組(引用的數組)。
無鎖鏈表的實現
在實現無鎖HashMap之前,讓我們先來看一下比較簡單的無鎖鏈表的實現方法。
以插入操作為例:
首先我們需要找到待插入位置前面的節點A和後面的節點B。
然後新建一個節點C,並使其next指針指向節點B。(見圖1)
最後使節點A的next指針指向節點C。(見圖2)
但在操作中途,有可能其他線程在A與B直接也插入了一些節點(假設為D),如果我們不做任何判斷,可能造成其他線程插入節點的丟失。(見圖3)我們可以利用CAS操作,在為節點A的next指針賦值時,判斷其是否仍然指向B,如果節點A的next指針發生了變化則重試整個插入操作。大致代碼如下:
代碼如下:
privatevoidlistInsert(Node head, Node c) {
for(;;) {
Node a = findInsertionPlace(head), b = a.next.get();
c.next.set(b);
if(a.next.compareAndSwap(b,c))
return;
}
}
(Node類的next字段為AtomicReference<Node>類型,即指向Node類型的原子性引用)
無鎖鏈表的查找操作與普通鏈表沒有區別。而其刪除操作,則需要找到待刪除節點前方的節點A和後方的節點B,利用CAS操作驗證並更新節點A的next指針,使其指向節點B。
無鎖HashMap的難點與突破
HashMap主要有插入、刪除、查找以及ReHash四種基本操作。一個典型的HashMap實現,會用到一個數組,數組的每項元素為一個節點的鏈表。對於此鏈表,我們可以利用上文提到的操作方法,執行插入、刪除以及查找操作,但對於ReHash操作則比較困難。
如圖4,在ReHash過程中,一個典型的操作是遍歷舊表中的每個節點,計算其在新表中的位置,然後將其移動至新表中。期間我們需要操縱3次指針:
將A的next指針指向D
將B的next指針指向C
將C的next指針指向E
而這三次指針操作必須同時完成,才能保證移動操作的原子性。但我們不難看出,CAS操作每次只能保證一個變量的值被原子性地驗證並更新,無法滿足同時驗證並更新三個指針的需求。
於是我們不妨換一個思路,既然移動節點的操作如此困難,我們可以使所有節點始終保持有序狀態,從而避免了移動操作。在典型的HashMap實現中,數組的長度始終保持為2i,而從Hash值映射為數組下標的過程,只是簡單地對數組長度執行取模運算(即僅保留Hash二進制的後i位)。當ReHash時,數組長度加倍變為2i+1,舊數組第j項鏈表中的每個節點,要麼移動到新數組中第j項,要麼移動到新數組中第j+2i項,而它們的唯一區別在於Hash值第i+1位的不同(第i+1位為0則仍為第j項,否則為第j+2i項)。
如圖5,我們將所有節點按照Hash值的翻轉位序(如1101->1011)由小到大排列。當數組大小為8時,2、18在一個組內;3、 11、27在另一個組內。每組的開始,插入一個哨兵節點,以方便後續操作。為了使哨兵節點正確排在組的最前方,我們將正常節點Hash的最高位(翻轉後變為最低位)置為1,而哨兵節點不設置這一位。
當數組擴容至16時(見圖6),第二組分裂為一個只含3的組和一個含有11、27的組,但節點之間的相對順序並未改變。這樣在ReHash時,我們就不需要移動節點了。
實現細節
由於擴容時數組的復制會占用大量的時間,這裡我們采用了將整個數組分塊,懶惰建立的方法。這樣,當訪問到某下標時,僅需判斷此下標所在塊是否已建立完畢(如果沒有則建立)。
另外定義size為當前已使用的下標范圍,其初始值為2,數組擴容時僅需將size加倍即可;定義count代表目前HashMap中包含的總節點個數(不算哨兵節點)。
初始時,數組中除第0項外,所有項都為null。第0項指向一個僅有一個哨兵節點的鏈表,代表整條鏈的起點。初始時全貌見圖7,其中淺綠色代表當前未使用的下標范圍,虛線箭頭代表邏輯上存在,但實際未建立的塊。
初始化下標操作
數組中為null的項都認為處於未初始化狀態,初始化某個下標即代表建立其對應的哨兵節點。初始化是遞歸進行的,即若其父下標未初始化,則先初始化其父下標。(一個下標的父下標是其移除最高二進制位後得到的下標)大致代碼如下:
代碼如下:
privatevoidinitializeBucket(intbucketIdx) {
intparentIdx = bucketIdx ^ Integer.highestOneBit(bucketIdx);
if(getBucket(parentIdx) ==null)
initializeBucket(parentIdx);
Node dummy =newNode();
dummy.hash = Integer.reverse(bucketIdx);
dummy.next =newAtomicReference<>();
setBucket(bucketIdx, listInsert(getBucket(parentIdx), dummy));
}
其中getBucket即封裝過的獲取數組某下標內容的方法,setBucket同理。listInsert將從指定位置開始查找適合插入的位置插入給定的節點,若鏈表中已存在hash相同的節點則返回那個已存在的節點;否則返回新插入的節點。
插入操作
首先用HashMap的size對鍵的hashCode取模,得到應插入的數組下標。
然後判斷該下標處是否為null,如果為null則初始化此下標。
構造一個新的節點,並插入到適當位置,注意節點中的hash值應為原hashCode經過位翻轉並將最低位置1之後的值。
將節點個數計數器加1,若加1後節點過多,則僅需將size改為size*2,代表對數組擴容(ReHash)。
查找操作
找出待查找節點在數組中的下標。
判斷該下標處是否為null,如果為null則返回查找失敗。
從相應位置進入鏈表,順次尋找,直至找出待查找節點或超出本組節點范圍。
刪除操作
找出應刪除節點在數組中的下標。
判斷該下標處是否為null,如果為null則初始化此下標。
找到待刪除節點,並從鏈表中刪除。(注意由於哨兵節點的存在,任何正常元素只被其唯一的前驅節點所引用,不存在被前驅節點與數組中指針同時引用的情況,從而不會出現需要同時修改多個指針的情況)
將節點個數計數器減1。