由於最近需要用多線程處理一些問題,一開始我用了.net默認的ThreadPool, 感覺不是很適合。於是我自己實現了一個簡單的ThreadPool。
寫的比較簡單,有興趣的朋友一起看看,共同改進。
代碼主要由ThreadPoolEx,WorkItem,WorkQueue組成。
ThreadPoolEx
1using System; 2using System.Collections.Generic; 3using System.Linq; 4using System.Text; 5using System.Threading; 6using System.Collections; 7 8namespace NetDragon.ThreadPoolEx 9{ 10 public class ThreadPoolEx 11 { 12 private WorkQueue _workQueue = new WorkQueue(); 13 14 public int MaxThreadCount = 10; 15 public int MinThreadCount = 2; 16 private Hashtable _threadTable = null; 17 18 private int _threadCount = 0; 19 private int _inUseWorkThread = 0; 20 21 public double IdleTimeout = 10; 22 23 public ThreadPoolEx():this(10,2,2) 24 { 25 } 26 27 public ThreadPoolEx(int maxThreadCouont, int minThreadCount, int idleTimeout) 28 { 29 MaxThreadCount = maxThreadCouont; 30 31 MinThreadCount = minThreadCount; 32 33 IdleTimeout = idleTimeout; 34 35 _threadTable = Hashtable.Synchronized(new Hashtable(MaxThreadCount)); 36 } 37 38 public void QueueUserWorkItem(WaitCallback waitCallback, object objParams) 39 { 40 EnqueueWorkItem(waitCallback, objParams); 41 } 42 43 private void EnqueueWorkItem(WaitCallback waitCallback,object objParams) 44 { 45 WorkItem workItem = new WorkItem() { 46 47 WorkCallback = waitCallback, 48 ObjParams = objParams 49 }; 50 51 _workQueue.Push(workItem); 52 53 if (_inUseWorkThread + _waitWorkItem > _threadTable.Count) 54 { 55 StartThread(); 56 } 57 } 58 59 private void StartThread() 60 { 61 if (_threadTable.Count < MaxThreadCount) 62 { 63 ++_threadCount; 64 65 Thread thread = new Thread(ProcessWorkItems); 66 67 thread.IsBackground = true; 68 69 thread.Name = "ThreadPoolEx #" + _threadCount; 70 71 thread.Priority = ThreadPriority.Normal; 72 73 _threadTable[thread] = System.DateTime.Now; 74 75 thread.Start(); 76 } 77 } 78 79 private void ProcessWorkItems() 80 { 81 82 try 83 { 84 while (true) 85 { 86 WorkItem workItem = _workQueue.Pop(); 87 88 if (workItem == null) 89 { 90 bool isTimeout = CurThreadIsTimeOut(); 91 92 if (isTimeout) 93 { 94 if (_threadTable.Count > MinThreadCount) 95 { 96 break; 97 } 98 } 99 100 System.Threading.Thread.Sleep(100); 101 } 102 else 103 { 104 105 try 106 { 107 _threadTable [Thread.CurrentThread] = System.DateTime.Now; 108 Interlocked.Increment(ref _inUseWorkThread); 109 110 workItem.Execute(); 111 } 112 catch (Exception) 113 { 114 // log something 115 } 116 finally 117 { 118 Interlocked.Decrement(ref _inUseWorkThread); 119 } 120 } 121 } 122 } 123 catch (ThreadAbortException) 124 { 125 Thread.ResetAbort(); 126 } 127 finally 128 { 129 if (_threadTable.Count > MinThreadCount) 130 { 131 _threadTable.Remove (Thread.CurrentThread); 132 } 133 } 134 } 135 136 private bool CurThreadIsTimeOut() 137 { 138 DateTime lastAliveTime = (DateTime) _threadTable[Thread.CurrentThread]; 139 140 DateTime curTime = System.DateTime.Now; 141 142 double waitSeconds = (curTime - lastAliveTime).TotalSeconds; 143 144 if(waitSeconds > IdleTimeout) 145 { 146 return true; 147 } 148 149 return false; 150 151 } 152 153 private int _waitWorkItem 154 { 155 get 156 { 157 return _workQueue.Count; 158 } 159 } 160 161 public int ThreadCount 162 { 163 get 164 { 165 return _threadTable.Count; 166 } 167 } 168 } 169} 170
WorkQueue
1using System; 2using System.Collections.Generic; 3using System.Linq; 4using System.Text; 5 6namespace NetDragon.ThreadPoolEx 7{ 8 class WorkQueue 9 { 10 private static object threadLock = new object(); 11 12 private Queue<WorkItem> _workQueue = new Queue<WorkItem>(); 13 14 public WorkItem Pop() 15 { 16 lock (threadLock) 17 { 18 if (_workQueue.Count > 0) 19 { 20 return _workQueue.Dequeue(); 21 } 22 return null; 23 } 24 } 25 26 public void Push(WorkItem workItem) 27 { 28 _workQueue.Enqueue(workItem); 29 } 30 31 public int Count 32 { 33 get 34 { 35 return _workQueue.Count; 36 } 37 } 38 } 39} 40
WorkItem
1using System; 2using System.Collections.Generic; 3using System.Linq; 4using System.Text; 5using System.Threading; 6 7namespace NetDragon.ThreadPoolEx 8{ 9 class WorkItem 10 { 11 public WaitCallback WorkCallback; 12 13 public object ObjParams; 14 15 public void Execute() 16 { 17 WorkCallback(ObjParams); 18 } 19 } 20}
WorkQueue
1using System;
2using System.Collections.Generic;
3using System.Linq;
4using System.Text;
5
6namespace NetDragon.ThreadPoolEx
7{
8 class WorkQueue
9 {
10 private static object threadLock = new object();
11
12 private Queue<WorkItem> _workQueue = new Queue<WorkItem>();
13
14 public WorkItem Pop()
15 {
16 lock (threadLock)
17 {
18 if (_workQueue.Count > 0)
19 {
20 return _workQueue.Dequeue();
21 }
22 return null;
23 }
24 }
25
26
27 public void Push(WorkItem workItem)
28 {
29 lock (threadLock)
30 {
31 _workQueue.Enqueue(workItem);
32
33 }
34 }
35
36 public int Count
37 {
38 get
39 {
40 return _workQueue.Count;
41 }
42 }
43 }
44}
45
本文配套源碼