程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> C#算法之基於無鎖的C#並發隊列實現

C#算法之基於無鎖的C#並發隊列實現

編輯:關於C語言

最近開始學習無鎖編程,和傳統的基於Lock的算法相比,無鎖編程具有其獨特的優點,Angel Lucifer 的關於無鎖編程一文對此有詳細的描述。

無鎖編程的目標是在不使用Lock的前提下保證並發過程中共享數據的一致性,其主要的實現基礎是CAS 操作,也就是compare_and_swap,通過處理器提供的指令,可以原子地更新共享數據,並同時監測其他線 程的干擾,.Net中的對應實現是InterLocked.CompareExchange函數。

既然不使用Lock,那在無鎖編程中要時刻注意的是,代碼可能在任意語句中被中斷。如果是單個變量 ,我們可以使用 InterLocked.XXX 保證操作的原子性,但是如果有多個操作要完成的話,簡單地組合 InterLocked.XXX 是遠遠不夠的。通常的原則是對函數中用到的共享變量,先在代碼開始處用局部變量保 存它的內容,在後面更新共享變量時,使用前述變量來判斷其是否發生了改變,如果共享變量發生了改變 ,那麼我們可能需要重試,或者在某些可能的情況下,當前線程可以"幫助"其他更新中的線程完成更新。

從上面可以總結出無鎖算法的兩個基本特征:

1. 無鎖算法總是包含一個循環結構,以保證更新失敗後重試

2. 無鎖算法在更新共享變量時,總是使用CAS和原始值進行比較,以保證沒有沖突

下面是按照Michael-Scott算法實現的並發隊列,其中的Dequeue算法在IBM的非阻塞算法一文中有詳細 介紹。代碼如下:

Code

1public class ConcurrentLinkedQueue<T>
2{
3  private class Node<K>
4  {
5    internal K Item;
6    internal Node<K> Next;
7
8    public Node(K item, Node<K> next)
9    {
10      this.Item = item;
11      this.Next = next;
12    }
13  }
14
15  private Node<T> _head;
16  private Node<T> _tail;
17
18  public ConcurrentLinkedQueue()
19  {
20    _head = new Node<T>(default(T), null);
21    _tail = _head;
22  }
23
24  public bool IsEmpty
25  {
26    get { return (_head.Next == null); }
27  }
28
29  public void Enqueue(T item)
30  {
31    Node<T> newNode = new Node<T>(item, null);
32    while (true)
33    {
34      Node<T> curTail = _tail;
35      Node<T> residue = curTail.Next;
36
37      //判斷_tail是否被其他process改變
38      if (curTail == _tail)
39      {
40        //A 有其他rocess執行C成功,_tail應該指向新的節點
41        if (residue == null)
42        {
43          //C 其他process改變了tail節點,需要重新取tail節點
44          if (Interlocked.CompareExchange<Node<T>>(
45            ref curTail.Next, newNode, residue) == residue)
46          {
47            //D 嘗試修改tail
48            Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
49            return;
50          }
51        }
52        else
53        {
54          //B 幫助其他線程完成D操作
55          Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
56        }
57      }
58    }
59  }
60
61  public bool TryDequeue(out T result)
62  {
63    Node<T> curHead;
64    Node<T> curTail;
65    Node<T> next;
66    do
67    {
68      curHead = _head;
69      curTail = _tail;
70      next = curHead.Next;
71      if (curHead == _head)
72      {
73        if (next == null) //Queue為空
74        {
75          result = default(T);
76          return false;
77        }
78        if (curHead == curTail) //Queue處於Enqueue第一個node的過程中
79        {
80          //嘗試幫助其他Process完成操作
81          Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
82        }
83        else
84        {
85          //取next.Item必須放到CAS之前
86          result = next.Item;
87          //如果_head沒有發生改變,則將_head指向next並退出
88          if (Interlocked.CompareExchange<Node<T>>(ref _head,
89            next, curHead) == curHead)
90            break;
91        }
92      }
93    }
94    while (true);
95    return true;
96  }
97}
98

根據自己的測試(雙核CPU),在輕度和中度爭用情況下,無鎖算法比基於鎖的算法性能好很多,在爭 用非常嚴重的情況下(100個並發線程以上/每CPU),基於鎖的算法性能開始顯示出優勢,因為一旦發生 爭用,基於鎖的算法會立刻切換到其他線程,而無鎖算法會進入下一次循環,導致CPU的占用。但是如此 嚴重的爭用在實際中並不多見,並且可以采用SpinWait的方法加以改進。基於鎖的算法在測試中曾經出現 過類似死鎖的現象,無鎖算法則完全沒有出過類似問題,另外,處理器核心越多,基於鎖的算法效率越差 。

從上面的算法實現中,可以體會到無鎖算法的優勢:在並發的多個線程中,總是有線程能夠推進,算 法總能在有限的循環次數內完成,並且在某些沖突的情況下,一個線程可以“幫助”其他線程完成被中斷 的工作,這些對提高吞吐量都有很大的作用。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved