上篇講到WinCe下的MSMQ安裝 ,這篇講述一下MSMQ在.NET Compact Framework 下的開發。
所謂MQ就是Message Queue,消息隊列。消息隊列可以作為不同應用程序之間 ,甚至不同機器之間通信的渠道。在消息隊列下進行通信的內容稱為消息 (Message),在C#程序下Message就是對象。
MSMQ就是Microsoft公司提供的MQ服務程序。MQ服務程序負責管理消息隊列, 保證消息在消息隊列這一渠道下能無誤的發送到對端,MQ支持離線交易,有時候 消息會緩存在MQ服務程序中,當接收方再線時候在提取消息。這一特性使得MQ可 以廣泛使用在移動領域,因為移動應用的網絡不能保證7×24的長連接。
生成隊列
在CF.net下開發MQ,需要引用System.Messaging庫。
using System.Messaging;
public class MQService
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private MessageQueue mServiceQueue;
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!MessageQueue.Exists (mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close();
}
// show message if we used an invalid message queue name;
catch (MessageQueueException MQException)
{
Console.WriteLine (MQException.Message);
}
return;
}
}
using System.Messaging;
public class MQService
{
private const string mMachinePrefix = @".";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private MessageQueue mServiceQueue;
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!MessageQueue.Exists (mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close();
}
// show message if we used an invalid message queue name;
catch (MessageQueueException MQException)
{
Console.WriteLine (MQException.Message);
}
return;
}
}
在建立Q之前先檢查該Q是否存在,如果存在就生成Q的處理對象,如果不存在 就先在隊列管理器建立這個Q。建立Q的時候,輸入參數為一個string,這個 string可以為path(路徑),FormatName或者Label。使用path的相對廣泛,在例 子中使用path作為輸入參數。Path由MachineName and QueueName組成,建立的Q 可以分為Public,Private,Journal和DeadLetter。使用廣泛的是Public和 Private,Public的Q由MachineName and QueueName組成,格式如 MachineName\QueueName,而Private的Q的格式為 MachineName\Private$\QueueName,比Public的Q多了一個標識Private$,在例子 中使用了Private的Q。路徑“.\”指的是本地機器。
Property Formatter十分重要,他定義了消息體的格式,所謂消息體的格式就 是通過這個Q通信的消息的數據類型,一個Q可以傳遞多個不同的數據類型,需要 在Type進行定義然後賦值給Formatter。
Event ReceiveCompleted用來注冊接收處理函數,當Q接收到消息後,使用注 冊的函數進行處理。使用ReceiveCompleted注冊處理函數以後,必須調用 BeginReceive讓這個Q進入異步接收狀態。
下面講述MQ應用中兩種常見的應用模式,第一種為請求回應模式,第二種為注 冊廣播模式。
請求回應模式
public class MQService
和請求回應模式相比MQService使用容器保存所有注冊的客戶端的Q,當需要 notify的時候遍歷所有客戶端Q進行廣播。MQClient建立廣播Q,然後注冊函數 ClientQueueReceiveCompleted處理廣播事件。MQ的應用能把Oberver模式應用跨 進程和跨系統,消息訂閱廣播機制可以借助MQ和observer模式來實現。
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler (MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close ();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
try
{
// Connect to the queue.
System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;
// End the asynchronous receive operation.
System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);
if (msg.Body.ToString() == "mq_reques_1")
{
msg.ResponseQueue.Send ("mq_respond_1");
}
else if (msg.Body.ToString() == "mq_reques_2")
{
msg.ResponseQueue.Send(true);
}
// Restart the asynchronous receive operation.
mq.BeginReceive();
}
catch (System.Messaging.MessageQueueException ex)
{
// Handle sources of MessageQueueException.
Console.WriteLine(ex.Message);
}
return;
}
}
public class MQClient
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private System.Messaging.MessageQueue mClientQueue;
public void InitQueues()
{
// create the message queue
try
{
mServiceQueue = new System.Messaging.MessageQueue (mServiceQueuePath);
// check to make sure the message queue does not exist already
if (! System.Messaging.MessageQueue.Exists(mClientQueuePath))
{
// create the new message queue and make it transactional
mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);
mClientQueue.Close();
}
else
{
mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);
}
Type[] types = new Type[2];
types[0] = typeof(string);
types[1] = typeof (bool);
mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mClientQueue.Close();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void SendRequest()
{
try
{
System.Messaging.Message message = new System.Messaging.Message("mq_reques_1");
message.ResponseQueue = mClientQueue;
mClientQueue.Purge();
mServiceQueue.Send (message);
System.Messaging.Message msg = mClientQueue.Receive(new TimeSpan(0, 0, 4));
//handle the result.
Console.WriteLine (msg.Body.ToString());
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
}
MQService是服務程序,負責服務隊列 ".\Private$\MQServiceQueue$"的建立和管理,當有新消息發送到該 服務隊列時MessageListenerEventHandler函數就會callback,取出消息進行分析 處理和發送返回,返回是通過client原先建立的Q進行返回,不是通過原服務Q返 回,因為MQ的隊列是單向的。MQClient負責客戶端隊列 ".\Private$\MQClientQueue$"的建立,在發送請求的時候把客戶端隊 列賦值到properties ResponseQueue裡,讓服務程序可以返回到這個客戶端的隊 列裡面,同時在等待返回的時候有超時控制。
注冊廣播模式
注冊廣播模式是Observer模式的一種應用,Observer模式可見實用設計模式之 一--Observer模式。
客戶端可以往服務端注冊關心的消息,服務端通過MQ自動廣播消息到客戶端。
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private Dictionary<string, MessageQueue> mmClientQueues = new Dictionary<string, MessageQueue>();
private void InitServiceQueue()
{
// create the message queue
try
{
// check to make sure the message queue does not exist already
if (!System.Messaging.MessageQueue.Exists(mServiceQueuePath))
{
// create the new message queue and make it transactional
mServiceQueue = System.Messaging.MessageQueue.Create(mServiceQueuePath);
mServiceQueue.Close();
}
else
{
mServiceQueue = new System.Messaging.MessageQueue(mServiceQueuePath);
}
Type[] types = new Type[1];
types[0] = typeof(string);
mServiceQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
mServiceQueue.ReceiveCompleted += new System.Messaging.ReceiveCompletedEventHandler (MessageListenerEventHandler);
// Begin the asynchronous receive operation.
mServiceQueue.BeginReceive();
mServiceQueue.Close ();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void MessageListenerEventHandler(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
try
{
// Connect to the queue.
System.Messaging.MessageQueue mq = (System.Messaging.MessageQueue)sender;
// End the asynchronous receive operation.
System.Messaging.Message msg = mq.EndReceive(e.AsyncResult);
if(msg.Body.ToString() == "mq_register_1")
{
mmClientQueues.Add(msg.Label, msg.ResponseQueue);
}
else if (msg.Body.ToString() == "mq_unregister_1")
{
mmClientQueues[msg.Label].Purge();
mmClientQueues.Remove(msg.Label);
}
// Restart the asynchronous receive operation.
mq.BeginReceive();
}
catch (System.Messaging.MessageQueueException ex)
{
// Handle sources of MessageQueueException.
Console.WriteLine(ex.Message);
}
return;
}
private void Notify(string str)
{
if (mmClientQueues.Count > 0)
{
foreach(MessageQueue mq in mmClientQueues.Values)
{
mq.Send(str);
}
}
}
}
public class MQClient
{
private const string mMachinePrefix = @".\";
private const string mPrivateQueueNamePrefix = mMachinePrefix + @"Private$\";
private const string mServiceQueuePath = mPrivateQueueNamePrefix + "MQServiceQueue$";
private const string mClientQueuePath = mPrivateQueueNamePrefix + "MQClientQueue$";
private System.Messaging.MessageQueue mServiceQueue;
private System.Messaging.MessageQueue mClientQueue;
public void InitQueues()
{
// create the message queue
try
{
mServiceQueue = new System.Messaging.MessageQueue (mServiceQueuePath);
// check to make sure the message queue does not exist already
if (! System.Messaging.MessageQueue.Exists(mClientQueuePath))
{
// create the new message queue and make it transactional
mClientQueue = System.Messaging.MessageQueue.Create(mClientQueuePath);
mClientQueue.Close();
}
else
{
mClientQueue = new System.Messaging.MessageQueue(mClientQueuePath);
}
Type[] types = new Type[2];
types[0] = typeof(string);
types[1] = typeof (bool);
mClientQueue.Formatter = new System.Messaging.XmlMessageFormatter(types);
//Initiate the asynchronous receive operation by telling the Message
// Queue to begin receiving messages and notify the event handler
// when finished
mClientQueue.ReceiveCompleted +=
new System.Messaging.ReceiveCompletedEventHandler (ClientQueueReceiveCompleted);
mClientQueue.BeginReceive();
mClientQueue.Close ();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
return;
}
private void RegisterService()
{
try
{
System.Messaging.Message message = new System.Messaging.Message("mq_register_1");
message.Label = "client1";
message.ResponseQueue = mClientQueue;
mServiceQueue.Send(message);
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
private void UnregisterService()
{
try
{
System.Messaging.Message message = new System.Messaging.Message ("mq_unregister_1");
message.Label = "client1";
mServiceQueue.Send (message);
Thread.Sleep(500);
mClientQueue.Purge();
}
// show message if we used an invalid message queue name;
catch (System.Messaging.MessageQueueException MQException)
{
Console.WriteLine(MQException.Message);
}
}
private void ClientQueueReceiveCompleted (Object source,
ReceiveCompletedEventArgs asyncResult)
{
try
{
// End the Asynchronous Receive Operation
Message message =
mClientQueue.EndReceive (asyncResult.AsyncResult);
if (message.Body is string)
{
Console.WriteLine (message.Body.ToString());
}
}
catch (MessageQueueException e)
{
Console.WriteLine
(String.Format (System.Globalization.CultureInfo.CurrentCulture,
"Failed to receive Message: {0} ", e.ToString()));
}
//Begin the next Asynchronous Receive Operation
mClientQueue.BeginReceive();
}
}