在敘述並發Stack前,我們先來了解下非線程安全的Stack。
Stack是一種線性數據結構,只能訪問它的一端來存儲或讀取數據。Stack很像餐廳中的一疊盤子:將 新盤子堆在最上面,並從最上面取走盤子。最後一個堆在上面的盤子第一個被取走。因此Stack也被稱為 後進先出結構(LIFO)。
Stack有兩種實現方式:數組和列表。下面我們分別用這兩種方式來實現一個簡單的Stack。采用數組 實現代碼如下:
using System;
using System.Collections.Generic;
namespace Lucifer.DataStructure
{
public class ArrayStack<T>
{
private T[] array;
private int topOfStack;
private const int defaultCapacity = 16;
public ArrayStack() : this(defaultCapacity)
{
}
public ArrayStack(int initialCapacity)
{
if (initialCapacity < 0)
throw new ArgumentOutOfRangeException();
array = new T[initialCapacity];
topOfStack = -1;
}
/// <summary>
/// 查看Stack是否為空
/// </summary>
public bool IsEmpty()
{
return topOfStack == -1;
}
/// <summary>
/// 進棧
/// </summary>
public void Push(T item)
{
if (this.Count == array.Length)
{
T[] newArray = new T[this.Count == 0 ? defaultCapacity : 2 * array.Length];
Array.Copy(array, 0, newArray, 0, this.Count);
array = newArray;
}
this.array[++topOfStack] = item;
}
/// <summary>
/// 出棧
/// </summary>
public T Pop()
{
if (this.IsEmpty())
throw new InvalidOperationException("The stack is empty.");
T popped = array[topOfStack];
array[topOfStack] = default(T);
topOfStack--;
return popped;
}
/// <summary>
/// 返回棧頂,但不刪除棧頂的值。
/// </summary>
public T Peek()
{
if (this.IsEmpty())
throw new InvalidOperationException("The stack is empty.");
return array[topOfStack];
}
/// <summary>
/// Stack內元素的數量
/// </summary>
public int Count
{
get
{
return this.topOfStack + 1;
}
}
}
}
如上面的代碼所示,ArrayStack<T>定義了兩個數據成員: array用來存儲棧的數據項並在需要 時擴展;topOfStack則定位當前棧頂的索引。如果是空棧,該索引值為-1。唯一值得一提的是Push操作。 棧的每個操作時間復雜度都是O(1)。但是Push操作在數組滿載的時候會引起一個數組加倍的操作,這將花 費O(N)的時間。如果這個操作經常發生的話,我們需要考慮改進。然而,實際上這個操作很少發生,因為 包含N個元素的數組加倍只有在至少N/2次不包含數組加倍的Push後才會發生一次。因此,我們可以把加倍 的O(N)代價均談到N/2次簡單的Push操作上,這樣平均每個Push操作的代價只增加了一小點。此外,我們 沒有讓它繼承IEnumerable<T>和ICollection<T>接口,這樣做的目的是避免其他的細節實現 淹沒了我們的主題。.NET的Stack<T>采用的就是我們上述的方法,但繼承的是 IEnumerable<T>,ICollection,IEnumerable。而且它的默認容量是4,而我們這裡的 ArrayStack<T>的默認容量是16。我認為在大內存的現在,16應該是比較合理的數字。D語言的實現 請看http://code.google.com/p/d-phoenix/source/browse/trunk/source/system/collections/Stack.d 。
除了使用數組實現以外,我們還可以使用鏈表實現。鏈表的優勢在於額外的空間僅僅是一個項的引用 。而數組實現所用的額外空間則等於空余的數組項的個數。
為了使鏈表實現可以與數組實現有競爭力,我們必須能夠以常量時間執行鏈表的基本操作。要做到這 點很容易,因為對鏈表的改變僅僅在於鏈表兩端的數據項。具體實現代碼如下:
public class ListStack<T>
{
private ListNode<T> topOfStack;
public bool IsEmpty()
{
return topOfStack == null;
}
public void Push(T item)
{
topOfStack = new ListNode<T>(item, topOfStack);
}
public T Pop()
{
if (this.IsEmpty())
throw new InvalidOperationException("The stack is empty.");
T popped = topOfStack.item;
topOfStack = topOfStack.next;
return popped;
}
public T Peek()
{
if (IsEmpty())
throw new InvalidOperationException("The stack is empty.");
return topOfStack.item;
}
class ListNode<T>
{
internal T item;
internal ListNode<T> next;
public ListNode(T initItem, ListNode<T> initNext)
{
this.item = initItem;
this.next = initNext;
}
public ListNode(T initItem)
: this(initItem, null)
{
}
}
}
這兩種實現的時間復雜度都是O(1)。因此,它們都相當快速,不會成為任何算法的瓶頸。從這點上來 看,使用哪種方式實現都無所謂。
使用數組實現可能比使用鏈表實現稍快一些,尤其是在能夠准確估評估所需要的容量時。如果估計正 確,就不會有數組加倍操作。此外,數組提供的順序訪問通常比由動態內存分配的非順序訪問要快。但是 數組實現也存在著浪費額外內存空間的缺點。這是一個時間-空間取捨的問題。
接下來我們將使用我們在《並發數據結構:迷人的原子》中學習到的CAS原語來構造一個Lock-Free堆棧 。因為CAS原語最多只能交換64Bit,如果采取數組實現方式,幾乎很難實現。因此我們采取鏈表的實現方 式。這只要在需要進行同步的地方采用CAS原語交換就可以了。具體實現代碼如下:
public class LockFreeStack<T>
{
private ListNode<T> topOfStack;
public bool IsEmpty()
{
return topOfStack == null;
}
public void Push(T item)
{
ListNode<T> newTopOfStack = new ListNode<T>(item);
ListNode<T> oldTopOfStack;
do
{
oldTopOfStack = topOfStack;
newTopOfStack.next = oldTopOfStack;
}
while (Interlocked.CompareExchange<ListNode<T>>(ref topOfStack, newTopOfStack, oldTopOfStack) != oldTopOfStack);
}
/// <summary>
/// 考慮到在多線程環境中,這裡不拋出異常。我們需要人為判斷其是否為空,即 ! TryPop() or result != null
/// </summary>
/// <returns></returns>
public bool TryPop(out T result)
{
ListNode<T> oldTopOfStack;
ListNode<T> newTopOfStack;
do
{
oldTopOfStack = topOfStack;
if (oldTopOfStack == null)
{
result = default(T);
return false;
}
newTopOfStack = topOfStack.next;
}
while(Interlocked.CompareExchange<ListNode<T>>(ref topOfStack, newTopOfStack, oldTopOfStack) != oldTopOfStack);
result = oldTopOfStack.item;
return true;
}
public bool TryPeek(out T result)
{
ListNode<T> head = topOfStack;
if (head == null)
{
result = default(T);
return false;
}
result = head.item;
return true;
}
/* *****************************************
* 簡單的單向鏈表實現
* *****************************************/
class ListNode<T>
{
internal T item;
internal ListNode<T> next;
public ListNode(T initItem, ListNode<T> initNext)
{
this.item = initItem;
this.next = initNext;
}
public ListNode(T initItem)
: this(initItem, null)
{
}
}
}
我們現在已經知道CAS原語有個ABA問題(具體請參考並發數據結構:迷人的原子)。那麼我們上面的 Lock-free代碼有沒有這個問題?這需要我們了解它的本質。CAS比較的其實是一個內存地址,這跟內存回 收機制有著莫大的關聯。C/C++的內存回收策略使得某些時候內存會被重復使用。比方,我們剛剛刪除了 某個類型的實例,如果在此時又有該類型的實例被創建。那麼很有可能這個實例的內存地址就是我們剛剛 被刪除的類型實例的地址。這樣ABA問題就出現了。凡是牽涉到顯式內存管理的地方,我們都要考慮會不 會導致ABA問題。所以用C/C++編寫Lock-free代碼相當的麻煩,我們可能會用CAS2原語或者Hazard Pointers來解決此類問題。但是.NET是有GC的。GC在這裡很好的幫助了我們。關於GC的詳細描述,請參考 《CLR via C#》第20章。這裡簡單描寫下。.NET的托管堆上維護著一個指針,我們稱之為NextObjPtr,它 表示下一個新建對象分配時在托管堆中所處的位置。在.NET中,我們只要new一個對象,NextObjPtr就會 返回對象的內存地址,並且會再次指示下一個新建對象分配時在托管堆中所處的位置。內存回收時,GC有 Mark/Clear以及壓縮階段。此外,.NET的GC還有分代機制。
通過上面的描述,我們就可以知道TryPop()不會有ABA問題。因為它壓根就沒有內存分配和回收。而只 是已經在內存中的對象位置變換而已(這些對象的內存地址肯定不同)。唯一需要考慮的是Push()操作。它 有ABA問題嗎?答案是否定的。因為我們在每次Push操作中只分配新對象,而不刪除老對象。所有等待回 收的對象全部交給GC處理。那麼假如TryPop和Push操作前後交替進行,會發生ABA問題嗎?我的看法是有 可能,但極難發生,發生了也極難重現。但我在http://msdn2.microsoft.com/zh- cn/magazine/cc163427.aspx裡看到的說法是不會發生該問題,而在 http://www.research.ibm.com/people/m/michael/ieeetpds-2004.pdf裡講的是有可能發生。我比較傾向 於後者,因為MSDN的那篇Paper沒有講出個所以然來,但後者也語焉不詳。不過.NET的並行庫中的 ConcurrentStack<T>和Java中還是這樣實現了。因為Lock-Free編碼很難證明其正確性,我們權且 相信它是安全的。如果哪位達人了解的話,在下虛心請教。還望不吝賜教。
在輕度到中度的爭用情況下,上面的代碼比基於鎖的代碼性能高出很多,大概會有3~4倍。因為 CAS 的多數時間都在第一次嘗試時就成功,而發生爭用時的開銷也不涉及線程掛起和上下文切換,只多了幾個 循環迭代。沒有爭用的 CAS 要比沒有爭用的鎖便宜得多,而爭用的 CAS 比爭用的鎖獲取涉及更短的延遲 。
在高度爭用的情況下(即有多個線程不斷爭用一個內存位置的時候),基於鎖的算法開始提供比非阻 塞算法更好的吞吐率,因為當線程阻塞時,它就會停止爭用,耐心地等候輪到自己,從而避免了進一步爭 用。但是,這麼高的爭用程度並不常見,因為多數時候,線程會把線程本地的計算與爭用共享數據的操作 分開,從而給其他線程使用共享數據的機會。(這麼高的爭用程度也表明需要重新檢查算法,朝著更少共 享數據的方向努力。)此外,CAS涉及的內存分配和回收也是阻礙性能的一大因素。
請注意,上述代碼是理想的並發形式:無需阻止其他線程存取數據,只需抱著會在爭用中“勝出”的 信念即可。如果事實證明無法“勝出”,我們將會遇到一些變化不定的問題,例如活鎖。
所以我們將在下一集引入一個新的並發數據結構:SpinLock來構造更好的並發堆棧。