上一篇中我介紹了一種通過封閉Critical Section對象而方便的使用互斥鎖的方式,文中所有的例子是兩個線程對同一數據一讀一寫,因此需要讓它們在這裡互斥,不能同時訪問。而在實際情況中可能會有更復雜的情況出現,就是多個線程訪問同一數據,一部分是讀,一部分是寫。我們知道只有讀-寫或寫-寫同時進行時可能會出現問題,而讀-讀則可以同時進行,因為它們不會對數據進行修改,所以也有必要在C++中封裝一種方便的允許讀-讀並發、讀-寫與寫-寫互斥的鎖。要實現這種鎖,使用臨界區就很困難了,不如改用內核對象,這裡我使用的是互斥量(Mutex)。
總體的結構與上一篇中的類似,都是寫出一個對鎖進行封裝的基類,再寫一個用於調用加、解鎖函數的類,通過對第二個類的生命周期的管理實現加鎖和解鎖。這裡涉及到兩個新問題,一是加鎖、解鎖動作都有兩種,一種是加/解讀鎖,一種是加/解寫鎖;二是為了允許讀-讀並發,這裡只聲明一個Mutex是不夠的,必須要聲明多個Mutex,而且有多少個Mutex就同時允許多少個讀線程並發,之所以這麼說,是因為我們要使用的API函數是WaitForMultipleObjects。
WaitForMultipleObjects函數的功能就是等待對象狀態被設置,MSDN中對它的說明為:
Waits until one or all of the specified objects are in the signaled state or the time-out interval elapses.
這是個很好用的函數,我們可以用它來等待某個或某幾個對象,並且允許設置超時時間,等待成功時與超時時返回的值是不同的。如果返回的值比WAIT_ABANDONED小則表示等待成功。“等待成功”對於不同類型的內核對象有不同的意義,例如對於進程或線程對象,等待成功就表示進程或線程執行結束了;對於互斥量對象,則表示此對象現在不被任何其他線程擁有,並且一旦等待成功,當前線程即擁有了此互斥量,其他線程則不能同時擁有,直接調用ReleaseMutex函數主動釋放互斥量。
與WaitForMultipleObjects類似的還有一個函數WaitForSingleObject,它的功能比較簡單,只針對單一個對象,而WaitForMultipleObjects可以同時等待多個對象,並且可以設置是否等待所有對象。
上一篇文章中用的InstanceLockBase類裡面封裝了一個Critical Section對象,這裡則要封裝一組Mutex的Handle,那麼這一組是多少個呢?它應該由使用此類的程序中定義,例如可以用動態數組的方法:
//基類:
class RWLockBase //表示Read/Write Lock
...{
HANDLE* handles;
protected:
RWLockBase(int handleCount) ...{ handles = new HANDLE[handleCount]; }
…
};
//子類:
class MyClass: public RWLockBase
...{
MyClass(): RWLockBase(3) ...{}
…
};
這確實是個不錯的辦法,通過在子類構造函數的初始化段中調用基類構造函數並傳參,使得這個動態數組得以正確初始化,不過這樣看著不太爽,子類必須兩次出現“RWLockBase”一詞,能不能像InstanceLockBase那樣只要繼承了就好呢?答案是肯定的,只要用C++模板即可:
template <int maxReadCount>
class RWLockBase
...{
HANDLE handles[maxReadCount];
…
};
使用模板附帶這麼一個好處,因為模板參數是在編譯期可以確定的,所以無需再用動態數組,直接在棧上分配即可。而使用模板引出一個新問題,就是相應的Lock類(RWLock)在構造時傳的對象指針時的類型聲明,直接寫成RWLock(RWLockBase* pObj)肯定是不行的,因為必須指定模板參數,並且其值還必須與聲明RWLockBase時所指定的值一致才行,從而客戶端代碼就必須兩次指定模板參數值,不爽!解決的辦法也是有一個,就是把RWLockBase變成夾層類,為它再聲明一個基類,讓RWLock接收的是基類指針,並把Lock、Unlock等函數放在基類中,聲明為純虛函數,實現寫在夾層類中:
class _RWLockBase
...{
friend class RWLock;
protected:
virtual DWORD ReadLock(int timeout) = 0;
virtual void ReadUnlock(int handleIndex) = 0;
virtual DWORD WriteLock(int timeout) = 0;
virtual void WriteUnlock() = 0;
};
模板類RWLockBase從_RWLockBase繼承,並對四個函數寫出實現:
template <int maxReadCount = 3> //這裡給一個缺省參數,盡量減少客戶端代碼量
class RWLockBase: public _RWLockBase
...{
HANDLE handles[maxReadCount];
DWORD ReadLock(int timeout) //加讀鎖,只要等到一個互斥量返回即可
...{
return ::WaitForMultipleObjects(maxReadCount, handles, FALSE, timeout);
}
void ReadUnlock(int handleIndex) //解讀鎖,釋放已獲得的互斥量
...{
::ReleaseMutex(handles[handleIndex]);
}
DWORD WriteLock(int timeout) //加寫鎖,等到所有互斥量,從而與其他所有線程互斥
...{
return ::WaitForMultipleObjects(maxReadCount, handles, TRUE, timeout);
}
void WriteUnlock() //解寫鎖,釋放所有的互斥量
...{
for(int i = 0; i < maxReadCount; i++)
::ReleaseMutex(handles[i]);
}
protected:
WLockBase() //構造函數,初始化每個互斥量
..{
for(int i = 0; i < maxReadCount; i++)
handles[i] = ::CreateMutex(0, FALSE, 0);
}
~RWLockBase() //析構函數,銷毀對象
...{
for(int i = 0; i < maxReadCount; i++)
::CloseHandle(handles[i]);
}
};
而相應的鎖類也會稍復雜一些:
class RWLock
...{
bool lockSuccess; //因為有可能超時,需要保存是否等待成功
int readLockHandleIndex; //對於讀鎖,需要知道獲得的是哪個互斥量
_RWLockBase* _pObj; //目標對象基類指針
public:
//這裡通過第二個參數決定是加讀鎖還是寫鎖,第三個參數為超時的時間
RWLock(_RWLockBase* pObj, bool readLock = true, int timeout = 3000)
...{
_pObj = pObj;
lockSuccess = FALSE;
readLockHandleIndex = -1;
if(NULL == _pObj)
return;
if(readLock) //讀鎖
...{
DWORD retval = _pObj->ReadLock(timeout);
if(retval < WAIT_ABANDONED) //返回值小於WAIT_ABANDONED表示成功
...{ //其值減WAIT_OBJECT_0就是數組下標
readLockHandleIndex = retval - WAIT_OBJECT_0;
lockSuccess = TRUE;
}
}
else
...{
WORD retval = _pObj->WriteLock(timeout);
if(retval < WAIT_ABANDONED) //寫鎖時獲得了所有互斥量,無需保存下標
lockSuccess = TRUE;
}
}
~RWLock()
...{
if(NULL == _pObj)
return;
if(readLockHandleIndex > -1)
_pObj->ReadUnlock(readLockHandleIndex);
else
_pObj->WriteUnlock();
}
bool IsLockSuccess() const ...{ return lockSuccess; }
};
這樣一來,讀/寫鎖的類也就完成了,使用時與InstanceLock類似:
1、被鎖對象從RWLockBase<>類繼承
2、需要加讀鎖時,聲明一個RWLock實例,並指出要加的是讀鎖
3、需要加寫鎖時,聲明一個RWLock實例,並指出要加的是寫鎖
這裡還是要多說兩句,雖然使用純虛函數結合模板類,使得客戶端代碼量減到最少,但性能上有一些影響,因為聲明了虛函數,則實例中必然存在4個字節的VPTR,調用虛函數時則要查找VTABLE,空間和時間上都有微小的犧牲。而如果不使用模板類,則沒有虛函數的代價,但也有犧牲:不使用模板類則需要使用動態數組,動態數組本身需要程序運行時在堆上分配,這也需要時間;指向動態數組的指針也需要占用內存,所以空間上的開鎖是一樣的,時間上雖然動態分配內存需要的時間應該比虛函數的調用要慢一點,但初始化只需要一次,總體來說也是值得的。所以最終要使用哪一種,就看具體需要了。
這裡也給出一個實驗。這裡所用的被鎖類也上一篇類似,簡單的從RWLockBase類繼承:
class MyClass2: public RWLockBase<>
...{};
MyClass2 mc2;
看看兩個線程函數:
//讀線程
DWORD CALLBACK ReadThreadProc(LPVOID param)
...{
int i = (int)param;
RWLock lock(&mc2); //加讀鎖
if(lock.IsLockSuccess()) //如果加鎖成功
{
Say("read thread %d started", i); //為了代碼短一些,假設Say函數有這種能力
Sleep(1000);
Say("read thread %d ended", i);
}
else //加鎖超時,則顯示超時信息
Say("read thread %d timeout", i);
return 0;
}
//寫線程
DWORD CALLBACK WriteThreadProc(LPVOID param)
...{
int i = (int)param;
RWLock lock(&mc2, false); //加寫鎖。
if(lock.IsLockSuccess())
...{
Say("write thread %d started", i);
Sleep(600);
Say("write thread %d ended", i);
}
else
Say("write thread %d timeout", i);
return 0;
}
主線程:
int i;
for(i = 0; i < 5; i++)
::CreateThread(0, 0, ReadThreadProc, (LPVOID)i, 0, 0);
for(i = 0; i < 5; i++)
::CreateThread(0, 0, WriteThreadProc, (LPVOID)i, 0, 0);
程序共開10個線程,5個讀5個寫。從RWLockBase類繼承時我們使用了默認的模板參數,所以最多同時允許3個讀線程。程序的運行結果如下:
001 [15:07:28.484]read thread 0 started
002 [15:07:28.484]read thread 1 started
003 [15:07:28.484]read thread 2 started
004 [15:07:29.484]read thread 0 ended
005 [15:07:29.484]read thread 3 started
006 [15:07:29.484]read thread 1 ended
007 [15:07:29.484]read thread 4 started
008 [15:07:29.484]read thread 2 ended
009 [15:07:30.484]read thread 3 ended
010 [15:07:30.484]read thread 4 ended
011 [15:07:30.484]write thread 0 started
012 [15:07:31.078]write thread 0 ended
013 [15:07:31.078]write thread 1 started
014 [15:07:31.484]write thread 2 timeout
015 [15:07:31.484]write thread 3 timeout
016 [15:07:31.484]write thread 4 timeout
017 [15:07:31.687]write thread 1 ended
前三行三個讀線程取得讀鎖,之後等一秒(第4-8行),三個讀線程都結束了,並且余下的兩個讀線程取得讀鎖,雖然這時剩下了一個互斥量沒有使用,但因為其他的線程都請求加寫鎖,寫鎖與其他所有線程互斥,所以還不能取得寫鎖。再過一秒(第9-11行),後來的兩個取得讀鎖的線程也結束了,則第一個寫線程取得寫鎖。600毫秒之後(第12-13行)第一個寫線程結束,第二個寫線程開始。400毫秒之後(第14-16行)余下的三個寫線程都超時了,再後第二個寫線程也結束了。