自以為是的多線程(二),自以為是多線程
上一篇大家已經知道了,線程與線程之間的調度,是不可控的,那當我們去寫多線程程序的時候,一定要將線程是亂序的這一點考慮進去,若不然就會出現線程安全問題。
為什麼這樣講呢?因為當程序出現多個線程在運行的時候,你無法確定到底是哪一個線程在執行,可能A執行一行代碼,這個時候切換到B執行一行代碼,然後又切換回A再執行一行代碼,這都是有可能出現,不要以為我的代碼短,就那麼一兩行就不需要上鎖,多線程程序一定要嚴謹。
那如何保證嚴謹呢?
就是當你的程序在使用共享資源的時候,就是當多個線程都有可能調用到同一個變量或是訪問同一塊內存的時候,一定要保證這段代碼的線性執行,比如我有以下代碼:

![]()
public class DbActionQueue : IDisposable
{
public Queue<Action> _transQueue;
private Thread _thread;
private bool _isDispose = false;
private static readonly object _syncObject = new object();
private readonly object _syncQueueObject = new object();
private static DbActionQueue _instance;
public static DbActionQueue Instance
{
get
{
if (_instance == null)
{
lock (_syncObject)
{
if (_instance == null)
{
_instance = new DbActionQueue();
}
}
}
return _instance;
}
}
private DbActionQueue()
{
if (_transQueue == null)
{
_transQueue = new Queue<Action>();
}
if (_thread == null)
{
_thread = new Thread(Thread_Work)
{
IsBackground = true
};
}
_thread.Start();
}
public void Push(Action action)
{
if (_transQueue == null) throw new ArgumentNullException("dbActionQueue is not init");
lock (_syncQueueObject)
{
_transQueue.Enqueue(action);
}
}
public void Thread_Work()
{
while (!_isDispose)
{
Action[] items = null;
if (_transQueue != null && _transQueue.Count > 0)
{
lock (_syncQueueObject)
{
items = new Action[_transQueue.Count];
_transQueue.CopyTo(items, 0);
_transQueue.Clear();
}
}
if (items != null && items.Length > 0)
{
foreach (var item in items)
{
try
{
item.Invoke();
}
catch (Exception ex)
{
LogHelper.Write(string.Format("DbActionQueue error. | Exception.StackTrace:{0}", ex.StackTrace), ex);
}
}
}
Thread.Sleep(1);
}
}
public void Dispose()
{
_isDispose = true;
_thread.Join();
}
}
View Code
我在Enqueue的時候上了鎖,在Clear的時候也上了鎖,這裡有一個地方需要說一下,就是當你要對塊邏輯進行操作上鎖的時候,一定要鎖的是同一個對象,否則是沒有任何意義的。為什麼在這裡上鎖,假如我不上鎖,會有什麼問題?
不上鎖的情況下,首當其沖的是丟數據問題,當我有一個線程執行完了copyto這行代碼以後,有一個線程執行了Enqueue,這個時候,我當前線程會繼續跑Clear,就會把Enqueue的數據清理掉,那就相當於丟掉了一條數據。
假如代碼稍微變更一下:

![]()
while (!_isDispose)
{
Action item = null;
lock (_syncObject)
{
if (_transQueue != null && _transQueue.Count > 0)
{
item = _transQueue.Dequeue();
}
}
item.Invoke();
}
View Code
我們會發現,邏輯的執行代碼.invoke()放在了lock外面,這個地方上篇博客已經說過了,因為lock會導致的一系列問題,假如我是單條單條的取出的情況下,不上鎖可不可以?
不可以的,因為當你一個隊列在Enqueue的時候又在跑Dequeue的話,這個隊列會出現類似數據庫的髒讀,幻讀等不可預知的bug。不過可以通過換成ConcurrentQueue來解決這個問題,但是有一點要說一下,如果是批量取的情況下,換成ConcurrentQueue依然會出現上述所說的丟數據的問題,因為線程調度不可控,至於ConcurrentQueue的線程安全是通過原子鎖還是自旋鎖這個並沒有特別的文獻說明,這裡就不做探討。這裡還有一點要說一下,批量取是為了避免頻繁的lock,具體一次批量取多少條,你可以自己控制,我這裡是一次取完,你可以控制成一次取10條,20條,50條等。
我們會發現因為線程調度不可控這樣的一個前提,導致當我們多個線程之間要協作的時候,就會變的異常難以控制,所以在做程序設計的時候,請盡可能的避免多線程協作這種情況發生,如果一定發生了的話,一定不要理所當然的認為自己的代碼會按自己的理解執行,給大家舉一個例子:
代碼大致意思是,有一個網絡模塊,接收到客戶端的消息後,分配某個線程的隊列裡面去,然後該線程處理完以後,丟給發送線程,核心代碼如下:

![]()
protected virtual void ReceiveCallback(string ip, int port, string url, bool isLargePack, IntPtr streamHandle, long streamSize, IntPtr bodyData, int bodySize, IntPtr responseHandle)
{
//初始化一個線程等待事件(信號燈)
AutoResetEvent autoEvent = null;
//開啟異步處理的情況下(因為這個模塊支持同步和異步)
if (!this._isSync)
{
autoEvent = new AutoResetEvent(false);
}
//從streamHandler裡面讀取數據
var data = Read2Byte(streamHandle, bodyData, streamSize, bodySize, isLargePack);
//轉換成內部協議數據(Bson)
var obj = BsonHelper.ToObject<Communication>(data);
//一個Action<Communication, IntPtr, object>
if (Received != null)
{
Received.Invoke(obj, responseHandle, autoEvent);
}
//阻塞,一直到收到信號
if (autoEvent != null)
{
autoEvent.WaitOne(this._timeOut);
}
}
View Code
Receive.Invoke 這個地方Receive是一個Action,代碼如下:

![]()
public void InvokeCommand(Communication obj, IntPtr connect, object e)
{
//數據完整性判斷
if (obj == null || string.IsNullOrEmpty(obj.Command))
{
obj = new Communication
{
Command = "ErrorCommand",
Body = new Newtonsoft.Json.Linq.JObject()
};
obj.Body["Token"] = Guid.NewGuid().ToString();
}
var unit = new InternelUnit
{
Event = e,
Packet = obj,
Connection = connect
};
//是否同步
if (this._isSync)
{
this.RequestCallBack(unit);
}
else
{
//放入業務處理隊列
RequestQueueManage.Instance.Push(unit);
}
}
View Code
這兩段代碼的意思是,網絡模塊接受到消息以後,丟給線程隊列。那由於生存周期控制,導致RequestHandler這個句柄,只在這個方法體裡面有效,如果該方法體結束,則句柄被釋放。於是我們就有了,Push到線程隊列裡面以後,做了一個信號的WaitOne的處理。就是希望等到發送線程處理完以後,再釋放這個信號,代碼如下:

![]()
public void ResponseCallBack(InternelUnit unit)
{
//該包是否要入丟包池
if (unit.IsInLastPackPool)
{
Core.LostPacketPool.LostPacketPool.Instance.Push(ref unit);
}
//按協議轉換成byte[]
var repBson = BsonHelper.ToBson(unit.Packet);
//是否開啟加密
if (this._isEncrypt)
{
repBson = EncryptHelper.Decrypt(repBson, repBson.Length);
}
//發送
Network.NetworkHelper.Send(unit.Connection, repBson, unit.Id);
//是否開啟異步
if (!_isSync)
{
//釋放信號
(unit.Event as System.Threading.AutoResetEvent).Set();
}
}
View Code
這整段代碼,在大部分情況下是不會有問題的,但是由於剛剛我們說到的,線程調度不可控,於是我們無法保證,在Receive.Invoke()以後,代碼繼續向下執行,執行了WaitOne(),如果在Receive.Invoke以後,程序就切換到了,業務處理線程,那就有可能出現,先執行了Set()釋放了信號,然後再執行WaitOne(),就會出現死鎖,不過好在我們有做超時控制,並不會出現絕對的死鎖(不過也相差無幾了)。
所以這段程序這樣寫,就是一個不嚴謹的程序,會出現很多莫名其妙的超時。那當程序確實需要多線程之間協作的時候,請盡可能的用callback的方式來進行處理,而且控制好生命周期,盡可能的避免資源得不到釋放。
再舉個比較常見的投票的例子:

![]()
//從緩存獲取文章對象
var article = CacheHelper.Get(articleid);
給點贊的+1
article.Up++
寫回緩存,由於引用技術關系,所以如果緩存是你自己控制在你的程序內部的話(比如Dictionary),這一步是可以省略的。
//CacheHelper.Set(articleid, article);
View Code
很簡單的一個計數器的代碼,但是由於當多個用戶同時點贊的話,程序就有可能把數據加錯(原因不再贅述)。於是我們便有了加lock的打算,代碼如下:
lock(object){
投票計數器+1
}
這裡有一個地方要注意,就是如果功底不夠的話,盡量不要lock(this),因為這裡的this指的是當前實例,而多個線程裡面可能會有多個實例,那麼lock的就不是同一個對象了。
這個時候你的代碼看起來就沒啥問題了,可是如果你的程序是部署在多台機器上面的,那麼數據加錯的問題就依然會出現,對吧。因為兩台機器上面lock的並不是同一個對象,這個時候可能就需要使用DB,或者是引入一個第三方的中間件(例如redis等),需要有一個地方作為一個唯一的中心控制,你才能保證數據的一致性,那還有一種做法,就是對articleid取模,讓同一片文章點贊的操作,轉到同一台機器上面去操作這樣也可。
同理,當我們在做DB到緩存的處理時,也是這樣,比如我們有以下代碼,
var list = CacheHelper.Get(key);
if(list == null){
list = GetListFromDB(xxx);
}
return list;
這一段代碼的問題就是,當GetListFromDB()的時候,數據發生了變化,那可能多台機器拿到的list,就會不一樣。你可能就需要做一些定時同步的處理了。如果多個線程一直讀的時候,又會出現,多個線程同時去DB拿數據的情況發生,這不是我們想看到的,於是我們便加Lock
var list = CacheHelper.Get(key);
if(list == null){
lock(object){
list = CacheHelper.Get(key);
if(list == null){
list = GetListFromDB(xxx);
}
}
}
return list;
為什麼會有雙重判斷?因為在你lock的時候,之前的那個線程可能已經讀取到數據了,這樣就可以避免當多個線程運行到這裡的時候,由於已經判斷了的原因,導致多個線程依然去DB取數據。由於從DB取數據比較緩慢,所以這裡依然會有像我們上篇所講到的那樣,不斷的線程調度,鎖定,切換的這樣一個循環。所以盡量慎用lock。
線程安全的問題主要就是線程調度不可控的問題,我們需要盡可能的保證自己對共享資源處理的地方是block的,能夠線性執行。