最近開始學習無鎖編程,和傳統的基於Lock的算法相比,無鎖編程具有其獨特的優點,Angel Lucifer 的關於無鎖編程一文對此有詳細的描述。
無鎖編程的目標是在不使用Lock的前提下保證並發過程中共享數據的一致性,其主要的實現基礎是CAS 操作,也就是compare_and_swap,通過處理器提供的指令,可以原子地更新共享數據,並同時監測其他線 程的干擾,.Net中的對應實現是InterLocked.CompareExchange函數。
既然不使用Lock,那在無鎖編程中要時刻注意的是,代碼可能在任意語句中被中斷。如果是單個變量 ,我們可以使用 InterLocked.XXX 保證操作的原子性,但是如果有多個操作要完成的話,簡單地組合 InterLocked.XXX 是遠遠不夠的。通常的原則是對函數中用到的共享變量,先在代碼開始處用局部變量保 存它的內容,在後面更新共享變量時,使用前述變量來判斷其是否發生了改變,如果共享變量發生了改變 ,那麼我們可能需要重試,或者在某些可能的情況下,當前線程可以"幫助"其他更新中的線程完成更新。
從上面可以總結出無鎖算法的兩個基本特征:
1. 無鎖算法總是包含一個循環結構,以保證更新失敗後重試
2. 無鎖算法在更新共享變量時,總是使用CAS和原始值進行比較,以保證沒有沖突
下面是按照Michael-Scott算法實現的並發隊列,其中的Dequeue算法在IBM的非阻塞算法一文中有詳細 介紹。代碼如下:
Code
1public class ConcurrentLinkedQueue<T>
2{
3 private class Node<K>
4 {
5 internal K Item;
6 internal Node<K> Next;
7
8 public Node(K item, Node<K> next)
9 {
10 this.Item = item;
11 this.Next = next;
12 }
13 }
14
15 private Node<T> _head;
16 private Node<T> _tail;
17
18 public ConcurrentLinkedQueue()
19 {
20 _head = new Node<T>(default(T), null);
21 _tail = _head;
22 }
23
24 public bool IsEmpty
25 {
26 get { return (_head.Next == null); }
27 }
28
29 public void Enqueue(T item)
30 {
31 Node<T> newNode = new Node<T>(item, null);
32 while (true)
33 {
34 Node<T> curTail = _tail;
35 Node<T> residue = curTail.Next;
36
37 //判斷_tail是否被其他process改變
38 if (curTail == _tail)
39 {
40 //A 有其他rocess執行C成功,_tail應該指向新的節點
41 if (residue == null)
42 {
43 //C 其他process改變了tail節點,需要重新取tail節點
44 if (Interlocked.CompareExchange<Node<T>>(
45 ref curTail.Next, newNode, residue) == residue)
46 {
47 //D 嘗試修改tail
48 Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
49 return;
50 }
51 }
52 else
53 {
54 //B 幫助其他線程完成D操作
55 Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
56 }
57 }
58 }
59 }
60
61 public bool TryDequeue(out T result)
62 {
63 Node<T> curHead;
64 Node<T> curTail;
65 Node<T> next;
66 do
67 {
68 curHead = _head;
69 curTail = _tail;
70 next = curHead.Next;
71 if (curHead == _head)
72 {
73 if (next == null) //Queue為空
74 {
75 result = default(T);
76 return false;
77 }
78 if (curHead == curTail) //Queue處於Enqueue第一個node的過程中
79 {
80 //嘗試幫助其他Process完成操作
81 Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
82 }
83 else
84 {
85 //取next.Item必須放到CAS之前
86 result = next.Item;
87 //如果_head沒有發生改變,則將_head指向next並退出
88 if (Interlocked.CompareExchange<Node<T>>(ref _head,
89 next, curHead) == curHead)
90 break;
91 }
92 }
93 }
94 while (true);
95 return true;
96 }
97}
98
根據自己的測試(雙核CPU),在輕度和中度爭用情況下,無鎖算法比基於鎖的算法性能好很多,在爭 用非常嚴重的情況下(100個並發線程以上/每CPU),基於鎖的算法性能開始顯示出優勢,因為一旦發生 爭用,基於鎖的算法會立刻切換到其他線程,而無鎖算法會進入下一次循環,導致CPU的占用。但是如此 嚴重的爭用在實際中並不多見,並且可以采用SpinWait的方法加以改進。基於鎖的算法在測試中曾經出現 過類似死鎖的現象,無鎖算法則完全沒有出過類似問題,另外,處理器核心越多,基於鎖的算法效率越差 。
從上面的算法實現中,可以體會到無鎖算法的優勢:在並發的多個線程中,總是有線程能夠推進,算 法總能在有限的循環次數內完成,並且在某些沖突的情況下,一個線程可以“幫助”其他線程完成被中斷 的工作,這些對提高吞吐量都有很大的作用。