線程安全的HashMap版本。
1)基本思想:將整個大的hash table進一步細分成小的hash table,即Segment;
2)讀不用加鎖;寫操作在所在的Segmenet上加鎖,而不是整個HashMap,Hashtable就是所有方法競爭Hashtable上的鎖,導致並發效率低;
3)采用懶構造segment(除了segments[0]),以減少初始化內存。Unsafe類實現了AtomicReferenceArrays的功能,但減少了間接引用的程度。對Segment.table元素、HashEntry.next屬性采用更輕量級的“lazySet”寫(用Unsafe.putOrderedObject實現,確保用在lock中,其後有unlock釋放),以保證table更新的順序一致性。之前的ConcurrentHashMap類過度依賴final關鍵字,雖然能避免一些volatile讀,但是初始化時需要較大的內存。
4)並發級別由concurrencyLevel參數確定,太大會浪費內存空間和遍歷時間,太小則加強競爭;當只有一個線程寫,其他線程都是讀,則設置為1;
5)迭代器為其創建時或之後的某個時刻ConcurrentHashMap狀態,不會拋出ConcurrentModificationException,用來在一個線程中一次使用;
6)盡量避免rehash。
Segment
final Segment[] segments; // final方式 static final class Segment extends ReentrantLock implements Serializable { // 在預掃描中,segment阻塞前tryLock嘗試的最大數 // 在多處理器中,采用有限的嘗試次數使得在查找節點時緩存 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // segment的table數組 // 采用entryAt/setEntryAt提供對其元素的volatile讀寫 transient volatile HashEntry [] table; // segment中元素數目,獲取方式采用lock或其他volatile方式 transient int count; // 所在segment的寫總數 // 即使其overflows,也可以保證在isEmpty()、size()使用中的正確性 // 獲取方式采用lock或其他volatile方式 transient int modCount; // segment中table的負載阈值,超過則其table rehash transient int threshold; // 負載因子,對所有segment都一樣。 // 這個單獨作為segment的一個屬性是為了避免鏈接到外部對象 final float loadFactor; Segment(float lf, int threshold, HashEntry [] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } } static final class HashEntry { // hash、key為final;value、next為volatile final int hash; final K key; volatile V value; volatile HashEntry next; HashEntry(int hash, K key, V value, HashEntry next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } // 用UNSAFE.putOrderedObject進行next的volatile寫 final void setNext(HashEntry n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
segments.length = 16
segmentShift = 28
segmentMask = 15
Segment.table.length = 2
Segment.loadFactor = 0.75
Segment.threshold = 1
// 帶初始容量、負載因子、並發級別參數構造 // initialCapacity參數:用來確定Segment容量 // loadFactor參數:Segment的負載因子 // concurrencyLevel參數:用來確定segments長度,即並發級別 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; // 用來確定segment的移位 this.segmentMask = ssize - 1; // 用來確定segment的掩碼,即Segment數組長度-1,key hashCode的高位確定segment index if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; // 初始容量/Segment數組長度,即用來確定Segment容量 ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // Segment容量 // create segments and segments[0] Segments0 = new Segment (loadFactor, (int)(cap * loadFactor), (HashEntry [])new HashEntry[cap]); // 保證序列化與之前版本的兼容 Segment [] ss = (Segment [])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } // 帶初始容量、負載因子參數構造,默認並發級別為16 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); } // 帶初始容量參數構造,默認負載因子0.75 public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } // 無參構造,默認初始容量16 public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } /** * Creates a new map with the same mappings as the given map. * The map is created with a capacity of 1.5 times the number * of mappings in the given map or 16 (whichever is greater), * and a default load factor (0.75) and concurrencyLevel (16). * * @param m the map */ public ConcurrentHashMap(Map extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m); }
1)當Segment中鍵值對數超過負載阈值且table長度小於MAXIMUM_CAPACITY = 1 << 30,則對其table進行容量調整,table容量翻倍;
2)table最大容量為MAXIMUM_CAPACITY = 1 << 30;
3)table容量達到MAXIMUM_CAPACITY 後,如果有put請求,則直接在相應的bucket中鏈接進來,不會控制鍵值對的添加;
4)若進行了table的容量調整,需要將舊table關聯的鍵值對重新在新table中確定bucket,再添加進來,也就是所說的hash table rehash,這裡可以重用一些舊table的節點,因為next屬性是不變的。
// rehash @SuppressWarnings("unchecked") private void rehash(HashEntrynode) { // table長度翻倍 // 由於table長度為2的冪次方,在從就table到新table的rehash過程中, // 元素的索引要麼不變,要麼移位2的冪次方; // 一些老的節點因其next屬性不變可以重用,在默認的負載阈值下, // 只有1/6的元素需要復制。Entry的讀取值采用普通的數組索引,這是 // 因為其後有table的volatile寫 HashEntry [] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; // table長度翻倍,長度依然為2的冪次方 threshold = (int)(newCapacity * loadFactor); // 負載阈值 HashEntry [] newTable = (HashEntry []) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; // 采用普通的數組索引 if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 在新的table中,重用索引相同的老的節點,其next屬性不變 // Clone remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
確定segments數組中Segment索引、Segment中table數組bucket索引
基於key的hashCode再哈希產生hash碼,用其高位確定Segment索引,用其低位確定bucket索引:
// 對key的hashCode進行再次哈希,對其高位、低位進一步散列 // 由於segments、Segment.table length為均2的冪次方,所以對那些高位、低位相同的hashCode,容易產生hash碰撞; private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); } // 用hash碼的高位在segments中確定index int j = (hash >>> segmentShift) & segmentMask; // 用hash碼的低位確定bucket index int index = (tab.length - 1) & hash;
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)將put操作委托給確定的Segment進行put;
4)先采用自旋獲取該Segment的鎖;
5)用hash碼的低位確定該Segment中table的bucket;
6)先遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則替換新value後返回;否則用key、value、hash創建Entry,將其鏈接到bucket首位;
7)釋放該Segment的鎖。
public V put(K key, V value) { Segments; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment )UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); // 委托給確定的Segment進行put } // Segment put final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 采用自旋,獲取鎖與遍歷bucket熱身 V oldValue; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; // bucketIndex HashEntry first = entryAt(tab, index); // table元素 volatile讀 for (HashEntry e = first;;) { if (e != null) { // 先遍歷看是否已有鍵值對 K k; // key相同或hash碼相等且key相等 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); // table元素 volatile寫 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; } // segments元素volatile讀,若不存在,則創建再用CAS寫入segments @SuppressWarnings("unchecked") private Segment ensureSegment(int k) { final Segment [] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment seg; // 對segments中元素進行volatile讀,若為null則 if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { Segment proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry [] tab = (HashEntry [])new HashEntry[cap]; if ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次檢查是否為null Segment s = new Segment (lf, threshold, tab); while ((seg = (Segment )UNSAFE.getObjectVolatile(ss, u))// 用CAS方式寫 == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } // 在嘗試獲取Segment鎖的過程中,遍歷hash碼所確定的bucket中的節點, // 如果沒有,則創建返回,返回時確保獲取到當前Segment鎖。這裡只采用equals // 來比較key,這不重要,主要是為了遍歷熱身。 private HashEntry scanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key)) // 不是真的比較,僅僅簡單遍歷 retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } // table元素volatile讀 @SuppressWarnings("unchecked") static final HashEntry entryAt(HashEntry [] tab, int i) { return (tab == null) ? null : (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); } // table元素volatile寫 static final void setEntryAt(HashEntry [] tab, int i, HashEntry e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); }
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)將put操作委托給確定的Segment進行remove;
4)先采用自旋獲取該Segment的鎖;
5)用hash碼的低位確定該Segment中table的bucket;
6)遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則刪除;
7)釋放該Segment的鎖。
public V remove(Object key) { int hash = hash(key); Segments = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); } final V remove(Object key, int hash, Object value) { if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; HashEntry e = entryAt(tab, index); HashEntry pred = null; while (e != null) { K k; HashEntry next = e.next; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; if (value == null || value == v || value.equals(v)) { if (pred == null) setEntryAt(tab, index, next); // 刪除的是bucket的第一個節點,volatile寫 else pred.setNext(next); // 通過改變next刪除節點,next的volatile寫 ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
步驟:
1)根據key的hashCode獲取hash碼;
2)用hash碼的高位確定Segment;
3)用hash碼的低位確定該Segment中table的bucket;
4)遍歷該bucket中鍵值對,確定是否已有相同或hash碼相等且key相等的鍵值對,有則返回關聯的value;否則返回null。
public V get(Object key) { Segments; // manually integrate access methods to reduce overhead HashEntry [] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
// 判斷ConcurrentHashMap是否為空 // 用segment的modCount來確定調用時間段 public boolean isEmpty() { long sum = 0L; final Segment[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) // modCount溢不溢出都沒有關系,只要sum為0L就表明沒有修改 return false; } return true; } // 獲取ConcurrentHashMap的鍵值對總數 // 用segment的modCount來確定調用時間段 // 先嘗試RETRIES_BEFORE_LOCK次數獲取size,獲取失敗則加全鎖 public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment [] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
HashIterator為迭代器基類,從後往前對ConcurrentHashMap進行全遍歷,性能一般,KeyIterator、ValueIterator、EntryIterator都繼承於該類:
abstract class HashIterator { int nextSegmentIndex; int nextTableIndex; HashEntry[] currentTable; HashEntry nextEntry; HashEntry lastReturned; HashIterator() { nextSegmentIndex = segments.length - 1; nextTableIndex = -1; advance(); } /** * Set nextEntry to first node of next non-empty table * (in backwards order, to simplify checks). */ final void advance() { for (;;) { if (nextTableIndex >= 0) { if ((nextEntry = entryAt(currentTable, nextTableIndex--)) != null) // 往前找到不為null的bucket break; } else if (nextSegmentIndex >= 0) { Segment seg = segmentAt(segments, nextSegmentIndex--);// 往前找到不為null的Segment if (seg != null && (currentTable = seg.table) != null) nextTableIndex = currentTable.length - 1; } else break; } } final HashEntry nextEntry() { HashEntry e = nextEntry; if (e == null) throw new NoSuchElementException(); lastReturned = e; // cannot assign until after null check if ((nextEntry = e.next) == null) advance(); return e; } public final boolean hasNext() { return nextEntry != null; } public final boolean hasMoreElements() { return nextEntry != null; } public final void remove() { if (lastReturned == null) throw new IllegalStateException(); ConcurrentHashMap.this.remove(lastReturned.key); lastReturned = null; } }
1)segments、Segment.table的長度均為2的冪次方;
2)用hash函數,基於key的hashCode再次哈希,對其高位、低位進一步散列 ;
3)用hash碼的高位在segments中確定index,低位確定bucket index;
// 用hash碼的高位在segments中確定index int j = (hash >>> segmentShift) & segmentMask; // 用hash碼的低位確定bucket index int index = (tab.length - 1) & hash;
其並發體現在兩點:
1)Segment之間的並發;
2)Segment內部的讀寫並發;
1)並發讀取segments中的元素;2)將所有操作委托給Segment。
int h = hash(key); // hash碼 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;// segment index s = (Segment)UNSAFE.getObjectVolatile(segments, u)// 對segment index元素volatile讀
1)寫線程之間競爭Segment同一把鎖,讀線程不加鎖;
2)利用volatile、final語義保證並發寫線程之間、並發讀寫線程之間的可見性。
public V get(Object key) { Segments; // manually integrate access methods to reduce overhead HashEntry [] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // table volatile讀 for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);// table元素volatile讀 e != null; e = e.next) { // next volatile讀 K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) // key、hash為final return e.value;// value volatile讀 } } return null; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry [] tab = table; // // table volatile讀 int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); // table元素volatile讀 for (HashEntry e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {// key、hash為final oldValue = e.value; if (!onlyIfAbsent) { e.value = value;// value volatile寫 ++modCount; } break; } e = e.next;// next volatile讀 } else { if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node);// table元素volatile寫,弱一致性 ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
在put時,如果獲取的segment為null,則進行懶構造
if ((s = (Segment)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
這樣可以減少初始化內存空間。
final字段所在的類實例初始化完成後,在未在構造完成前暴露實例的this前提下,final字段對所有並發線程可見,可以部分減少volatile讀,但是會增加初始化內存空間。