CWorkerThread 抽象類
CWorkerThread 是一個由 CWorkerThreadAPPSpecifc、CWorkerThreadRoundRobin 和 CWorkerThreadAssembly 繼承的抽象類。無論如何處理消息,隊列的大部分處理是相同的,所以 CWorkerThread 類提供了這一功能。這個類提供了抽象方法(必須被實際方法替代)以管理資源和處理消息。
類的工作再一次通過 Start、Stop、Pause 和 Continue 方法來實現。在 Start 方法中引用了輸入和錯誤隊列。在 .Net 框架中,消息由 System.Messaging 名稱空間處理:
// 嘗試打開隊列,並設置默認的讀寫屬性
MessageQueue mqInput = new MessageQueue(sInputQueue);
mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.APPSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// 如果使用 MSMQ COM,則將格式化程序設置為 ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();
一旦定義了消息隊列引用,即會創建一個線程用於實際的處理函數(稱為 ProcessMessages)。在 .Net 框架中,使用 System.Threading 名稱空間很容易實現線程處理:
procMessage = new Thread(new ThreadStart(ProcessMessages));
procMessage.Start();
ProcessMessages 函數是基於 Boolean 值的處理循環。當數值設為 False,處理循環將終止。因此,線程對象的 Stop 方法只設置這一 Boolean 值,然後關閉打開的消息隊列,並加入帶有主線程的線程:
// 加入服務線程和處理線程
bRun = false;
procMessage.Join();
// 關閉打開的消息隊列
mqInput.Close();
mqError.Close();
Pause 方法只設置一個 Boolean 值,使處理線程休眠半秒鐘:
if (bPause)
Thread.Sleep(500);
最後,每一個 Start、Stop、Pause 和 Continue 方法將調用抽象的 OnStart、OnStop、OnPause 和 OnContinue 方法。這些抽象方法為實現的類提供了掛鉤,以捕獲和釋放所需的資源。
ProcessMessages 循環具有如下基本結構:
1、接收 Message。
2、如果 Message 具有成功的 Receive,則調用抽象 ProcessMessage 方法。
3、如果 Receive 或 ProcessMessage 失敗,將 Message 發送至錯誤隊列中。
Message mInput;
try
{
// 從隊列中讀取,並等候 1 秒
mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
// 將消息設置為 null
mInput = null;
// 查看錯誤代碼,了解是否超時
if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
{
// 如果未超時,發出一個錯誤並記錄錯誤號
LogError("Error: " + mqe.Message);
throw mqe;
}
}
if (mInput != null)
{
// 得到一個要處理的消息,調用處理消息抽象方法
try
{
ProcessMessage(mInput);
}
// 捕獲已知異常狀態的錯誤
catch (CWorkerThreadException ex)
{
ProcessError(mInput, ex.Terminate);
}
// 捕獲未知異常,並調用 Terminate
catch
{
ProcessError(mInput, true);
}
}
ProcessError 方法將錯誤的消息發送至錯誤隊列。另外,它也可能引發異常來終止線程。如果ProcessMessage 方法引發了終止錯誤或 CWorkerThreadException 類型,它將執行此操作。