其他形式的同步
我們可使用類Monitor與類Thread中的某些函數,直接控制線程的同步,請看例1。
例1:
using namespace System;
using namespace System::Threading;
int main()
{
/*1*/ MessageBuffer^ m = gcnew MessageBuffer;
/*2a*/ ProcessMessages^ pm = gcnew ProcessMessages(m);
/*2b*/ Thread^ pmt = gcnew Thread(gcnew ThreadStart(pm,&ProcessMessages::ProcessMessagesEntryPoint));
/*2c*/ pmt->Start();
/*3a*/ CreateMessages^ cm = gcnew CreateMessages(m);
/*3b*/ Thread^ cmt = gcnew Thread(gcnew ThreadStart(cm, &CreateMessages::CreateMessagesEntryPoint));
/*3c*/ cmt->Start();
/*4*/ cmt->Join();
/*5*/ pmt->Interrupt();
/*6*/ pmt->Join();
Console::WriteLine("Primary thread terminating");
}
public ref class MessageBuffer
{
String^ messageText;
public:
void SetMessage(String^ s)
{
/*7*/ Monitor::Enter(this);
messageText = s;
/*8*/ Monitor::Pulse(this);
Console::WriteLine("Set new message {0}", messageText);
Monitor::Exit(this);
}
void ProcessMessages()
{
/*9*/ Monitor::Enter(this);
while (true)
{
try
{
/*10*/ Monitor::Wait(this);
}
catch (ThreadInterruptedException^ e)
{
Console::WriteLine("ProcessMessage interrupted");
return;
}
Console::WriteLine("Processed new message {0}", messageText);
}
Monitor::Exit(this);
}
};
public ref class CreateMessages
{
MessageBuffer^ msg;
public:
CreateMessages(MessageBuffer^ m)
{
msg = m;
}
void CreateMessagesEntryPoint()
{
for (int i = 1; i <= 5; ++i)
{
msg->SetMessage(String::Concat("M-", i.ToString()));
Thread::Sleep(2000);
}
Console::WriteLine("CreateMessages thread terminating");
}
};
public ref class ProcessMessages
{
MessageBuffer^ msg;
public:
ProcessMessages(MessageBuffer^ m)
{
msg = m;
}
void ProcessMessagesEntryPoint()
{
msg->ProcessMessages();
Console::WriteLine("ProcessMessages thread terminating");
}
};
在標記1中,創建一個MessageBuffer類型的共享緩沖區;接著在標記2a、2b、2c中,創建了一個線程用於處理放置於緩沖區中的每條信息;標記3a、3b和3c,也創建了一個線程,並在共享緩沖區中放置了連續的5條信息以便處理。這兩個線程已被同步,因此處理者線程必須等到有"東西"放入到緩沖區中,才可以進行處理,且在前一條信息被處理完之前,不能放入第二條信息。在標記4中,將一直等待,直到創建者線程完成它的工作。
當標記5執行時,處理者線程必須處理所有創建者線程放入的信息,因為使用了Thread::Interrupt讓其停止工作,並繼續等待標記6中調用的Thread::Join,這個函數允許調用線程阻塞它自己,直到其他線程結束。(一個線程可指定一個等待的最大時間,而不用無限等待下去。)
線程CreateMessages非常清晰明了,它向共享緩沖區中寫入了5條信息,並在每條信息之間等待2秒。為把一個線程掛起一個給定的時間(以毫秒計),我們調用了Thread::Sleep,在此,一個睡眠的線程可再繼續執行,原因在於運行時環境,而不是另一個線程。
線程ProcessMessages甚至更加簡單,因為它利用了類MessageBuffer來做它的所有工作。類MessageBuffer中的函數是被同步的,因此在同一時間,只有一個函數能訪問共享緩沖區。
主程序首先啟動處理者線程,這個線程會執行ProcessMessages,其將獲得父對象的同步鎖;然而,它立即調用了標記10中的Wait函數,這個函數將讓它一直等待,直到再次被告之運行,期間,它也交出了同步鎖,這樣,允許創建者線程得到同步鎖並執行SetMessage。一旦函數把新的信息放入到共享緩沖區中,就會調用標記8中的Pulse,其允許等待同步鎖的任意線程被喚醒,並繼續執行下去。但是,在SetMessage執行完成之前,這些都不可能發生,因為它在函數返回前都不可能交出同步鎖。如果情況一旦發生,處理者線程將重新得到同步鎖,並從標記10之後開始繼續執行。此處要說明的是,一個線程即可無限等待,也可等到一個指定的時間到達。插1是程序的輸出。
插1:
Set new message M-1
Processed new message M-1
Set new message M-2
Processed new message M-2
Set new message M-3
Processed new message M-3
Set new message M-4
Processed new message M-4
Set new message M-5
Processed new message M-5
CreateMessages thread terminating
ProcessMessage interrupted
ProcessMessages thread terminating
Primary thread terminating
請仔細留意,處理者線程啟動於創建者線程之前。如果以相反的順序啟動,將會在沒有處理者線程等待的情況下,添加第一條信息,此時,沒有可供喚醒處理者線程,當處理者線程運行到它的第一個函數調用Wait時,將會錯過第一條信息,且只會在第二條信息存儲時被喚醒。
管理線程
默認情況下,如果一個線程是前台線程,它將會一直執行下去,直到進入點函數結束,而不管它父類的生命期是多久;而在另一方面,後台線程則會在父類線程結束時自動結束。可通過設置Thread的IsBackground屬性,把一個線程配置為後台線程,用同樣的方法,也可把一個後台線程配置為前台線程。
一旦線程被啟動,它即為活躍線程,可通過檢查Thread的IsAlive屬性來判斷一個線程是否為活躍線程;通過調用Wait函數,並傳遞給它一個零毫秒,可使一個線程放棄剩余的CPU時間片;另外,線程還可通過CurrentThread::Thread::CurrentThread屬性得到其自己的Thread對象。
每個線程都有與之相關的優先級,運行時環境(即操作系統)通過它來調度線程的執行,可通過Thread::Priority屬性來設置或檢測線程的優先級,它的范圍從ThreadPriority::Lowest 到ThreadPriority::Highest,默認情況下,線程的優先級為ThreadPriority::Normal。另外,因為實現環境的不同,線程調度會有所不同,所以在控制線程方面,不應該過分依賴線程的優先級。
易變字段(域)
volatile這個限定類型告訴編譯器,可能會有多個線程控制或訪問它所指定的對象,尤其是,一個或多個線程可能將異步讀寫此變量。基本上,這個限定詞是強制編譯器在進行優化時不要那麼"激進"。
請看例2中的代碼段,在缺少volatile時,標記1中的代碼完全可以忽略,因為在標記2中立即就改寫了i的值;然而,指定了volatile後,編譯器則必須執行這兩行代碼。
例2:
volatile int i = 0;
/*1*/ i = 10;
/*2*/ i = 20;
/*3*/ if (i < 5 || i > 10) {
// ...
}
int copy = i;
/*4*/ if (copy < 5 || copy > 10) {
// ...
}
在標記3中,編譯器必須生成取回值i的代碼兩次,但是,在兩次取值過程中,數值都有可能改變。為確保我們測試的是同一個值,在此不得不以類似標記4的代碼來代替。通過把值i的一個快照存儲在一個非易變的變量中,我們就可以安全地多次使用這個值了--因為它的值不可能在"後台"改變。在此,使用volatile,可避免對特定類型變量的顯式異步訪問。
線程局部存儲
當編寫多線程應用程序時,只在特定的線程中使用特定的變量,這是一個非常好的習慣,請看例3的程序:
例3:
using namespace System;
using namespace System::Threading;
public ref class ThreadX
{
/*1*/ int m1;
/*2*/ static int m2 = 20;
/*3*/ [ThreadStatic] static int m3 = 30;
public:
ThreadX()
{
m1 = 10;
}
void TMain()
{
String^ threadName = Thread::CurrentThread->Name;
/*4*/ Monitor::Enter(ThreadX::typeid);
for (int i = 1; i <= 5; ++i)
{
++m1;
++m2;
++m3;
}
Console::WriteLine("Thread {0}: m1 = {1}, m2 = {2}, m3 = {3}",
threadName, m1, m2, m3);
Monitor::Exit(ThreadX::typeid);
}
};
int main()
{
/*5*/ Thread::CurrentThread->Name = "t0";
ThreadX^ o1 = gcnew ThreadX;
Thread^ t1 = gcnew Thread(gcnew ThreadStart(o1, &ThreadX::TMain));
t1->Name = "t1";
ThreadX^ o2 = gcnew ThreadX;
Thread^ t2 = gcnew Thread(gcnew ThreadStart(o2, &ThreadX::TMain));
t2->Name = "t2";
t1->Start();
/*6*/ (gcnew ThreadX)->TMain();
t2->Start();
t1->Join();
t2->Join();
}
m1是一個實例字段,所以每個ThreadX的實例都有一份各自的拷貝,且在父類對象的生命期中都會存在;而另一方面,m2是一個類字段,所以對類來說,不管有幾個類的實例,它只有單獨的一個,從理論上來說,它將會一直存在,直到程序結束。但這兩個字段都不是特定於某個線程的,如果以適當的構造,這兩種類型的字段都能被多個線程訪問。
簡單來說,線程局部存儲就是特定線程擁有的某段內存,這段內存在新線程創建時被分配,而在線程結束時被釋放,它結合了局部變量的私有性和靜態變量的持久性。通過指定ThreadStatic屬性,可把一個字段標記為線程局部類型,如例中的標記3所示,在成為靜態字段之後,m3甚至還能有一個初始化函數。
函數TMain為新線程的入口點,這個函數只是簡單地遞增這三個變量:m1、m2和m3,每回5次,並打印出它們當前的值。標記4中的同步鎖保證了在這些字段遞增或打印時,另一個線程不會同時訪問它們。
在標記5中,主線程把它的名字設置為t0,接著創建並啟動了兩個線程,另外,它也把TMain當作了一個普通函數直接調用,而不是作為創建的新線程的一部分來調用。程序的輸出請見插2。
插2:
Thread t0: m1 = 15, m2 = 25, m3 = 35
Thread t1: m1 = 15, m2 = 30, m3 = 5
Thread t2: m1 = 15, m2 = 35, m3 = 5
每個線程都有其自己的m1實例,它被初始化為10,所以在遞增5次之後,每個線程中的值都為15。而m2則有所不同,所有的三個線程都共享同一變量,所以這一變量被遞增了15次。
線程t1與t2在經過線程創建過程之後,每個都有其自己的m3,然而,這些線程局部變量會被賦予默認的零值,而不是在源代碼中初始化的30,注意了,在經過5次遞增之後,各個值均為5,而線程t0則有所不同,正如我們所看到的,這個線程不是由創建其他兩個線程同樣的機制創建的,所以,它的m3會接受顯式初始化的值30。同時也請注意標記6,TMain作為一個普通函數被調用,而不是作為創建的新線程的一部分。
原子性與互鎖操作
如果存在這樣一種情況:一個應用程序有多個線程並行運行,每個線程對某些共享的整形變量,都有寫操作--只是簡單地使用++把變量遞增1。這看起來似乎沒什麼問題,畢竟,還算像是一個原子性操作,但在多數系統中--至少從機器指令的角度來看,C++/CLI執行環境對所有整形類型,並不能普遍地保證無誤。
作為示例,例4中的程序有三個線程,每個線程都同時遞增一個共享的64位整形變量一千萬次,最後顯示出這個變量的最終值,從理論上說,應該共遞增了三千萬次。這個程序目前可以兩種方式運行:默認方式使用++操作符以非同步方式運行;而另一種方式,通過帶有命令行參數Y或y,這回使用了一個同步的庫遞增函數。
例4:
using namespace System;
using namespace System::Threading;
static bool interlocked = false;
const int maxCount = 10000000;
/*1*/ static long long value = 0;
void TMain()
{
if (interlocked)
{
for (int i = 1; i <= maxCount; ++i)
{
/*2*/ Interlocked::Increment(value);
}
}
else
{
for (int i = 1; i <= maxCount; ++i)
{
/*3*/ ++value;
}
}
}
int main(array<String^>^ argv)
{
if (argv->Length == 1)
{
if (argv[0]->Equals("Y") || argv[0]->Equals("y"))
{
interlocked = true;
}
}
/*4*/ Thread^ t1 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t2 = gcnew Thread(gcnew ThreadStart(&TMain));
Thread^ t3 = gcnew Thread(gcnew ThreadStart(&TMain));
t1->Start();
t2->Start();
t3->Start();
t1->Join();
t2->Join();
t3->Join();
Console::WriteLine("After {0} operations, value = {1}", 3 * maxCount, value);
}
當使用標准++操作符時,程序5次連續執行之後,輸出如插3所示,可看出,結果與正確答案相距甚遠,簡單估算,大概有17%至50%的遞增操作未正確完成;當程序運行於同步方式時--即使用Interlocked::Increment,所有的三千萬次遞增操作都正常完成,結果計算正確。
插3:
使用++操作符的輸出
After 30000000 operations, value = 14323443
After 30000000 operations, value = 24521969
After 30000000 operations, value = 20000000
After 30000000 operations, value = 24245882
After 30000000 operations, value = 25404963
使用Interlocked遞增函數的輸出
After 30000000 operations, value = 30000000
另外,補充一點,Interlocked類還有另一個decrement函數。