程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 更多編程語言 >> WebSphere >> 使用Java API處理WebSphere MQ大消息

使用Java API處理WebSphere MQ大消息

編輯:WebSphere

WebSphere MQ 中處理大消息的方法

使用過 WebSphere MQ 的讀者都知道,WebSphere MQ 對處理的單條消息的大小是有限制的,目前支持的最大消息是100M,而且,隨著消息大小的增大,WebSphere MQ 處理的性能也會隨之下降。從最佳實踐來說,WebSphere MQ 傳輸大小為幾K的消息其效率是最高的。那如何使 WebSphere MQ 能高效的處理大消息呢?

WebSphere MQ 提供了處理大消息的兩種方法:消息分片和消息分組。下面我們來看在使用 Java API 編寫 WebSphere MQ 程序時如何實現消息分片和分組。

消息分片

消息分片的做法是把應用上一個大的邏輯消息分割成一個一個小的片段,每一個片段作為一個 WebSphere MQ 消息獨立傳輸,通過 MQMD 中 GroupId、 MsgSeqNumber 和 Offset 3 個屬性來標識,起始消息的 Offset 值為 0,而最後一個消息則會使用如下標記標識這是最後一個片段:MQMF_LAST_SEGMENT。

具體從實現上來說,消息分片可以細分為兩種模式:一是由隊列管理器自動實現消息的分片和組裝;二是由應用程序來實現消息的分片和組裝。下面我們將詳細向您介紹這兩種實現方式。

隊列管理器自動實現的消息分片

顧名思義,隊列管理器自動實現的消息分片就是由隊列管理器來完成消息的分片和組裝。對應用程序來說,不管是發送方還是接收方,它所處理的還是一個完整的大消息,只是在程序中通過設置一些標識來指示隊列管理器切分消息後再傳輸。所以,這種方式適用的場合為,出於傳輸效率的考慮,WebSphere MQ 不適宜傳輸大消息,而應用程序可以處理大消息,允許占用大塊內存。而且,此種方式對編寫應用程序來說也比較簡單。

實際在使用 Java API 編程時,對於發送方,發送消息時需要設置消息的 messageFlags 如下:

Msg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;

對於接收方,接收消息時需要設置 MQGetMessageOptions:

MQGetMessageOptions gmo = new MQGetMessageOptions ();

gmo.options = MQC.MQGMO_COMPLETE_MSG;

隊列管理器自動實現消息分片的部分代碼如清單 1,您可以下載詳細的示例程序代碼。

清單 1 隊列管理器自動實現消息分片

QMgrSegSender.java:
   int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   MQMessage myMsg = new MQMessage ();
   myMsg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;
   MQPutMessageOptions pmo = new MQPutMessageOptions ();
   String strMsg = "";
   for (int i=0;i<=100;i++)
     strMsg = strMsg + "Hello";
   myMsg.write(strMsg.getBytes());
   myQueue.put(myMsg,pmo);
   System.out.println("Put message:\n" + strMsg);
   myQueue.close();
   myQMgr.disconnect();
QMgrSegReceiver.java:
   int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   MQMessage myMsg = new MQMessage ();
   MQGetMessageOptions gmo = new MQGetMessageOptions ();
   gmo.options = MQC.MQGMO_COMPLETE_MSG;
   myQueue.get(myMsg, gmo);
   byte[] b = new byte[myMsg.getMessageLength()];
   myMsg.readFully(b);
   String strMsg = new String(b);
   System.out.println("Got message:\n" + strMsg);
   myQueue.close();
   myQMgr.disconnect();

程序功能介紹:

(1)QMgrSegSender 程序是構造一個長度為505字節的消息並把它寫入隊列 TESTQ 中。

為使 MQ 不能傳輸505字節的消息,可以修改隊列 TESTQ 的屬性“最大消息長度(MAXMSGL)”為100。

執行結果如下圖 1 所示,該消息被隊列管理器自動分割成6個片段消息:

圖 1 在 WebSphere MQ 資源管理器中浏覽分片消息

(2)QMgrSegReceiver 程序是從隊列 TESTQ 中讀取一個消息。

我們觀察執行的結果是它把隊列中6個片段消息組成一個完整的大消息取出。

使用隊列管理器自動實現消息分片對應用開發人員來講比較簡單,但是需要確保程序在內存使用等方面可以處理完整的大消息。

應用程序實現的消息分片

應用程序實現消息分片是指,在發送方應用程序中事先把大消息切分成多個片段,每一個片段作為一個獨立的消息,寫入到隊列中;在接收方應用程序中,每一個片段作為一個獨立的消息被取出,由程序把所有的消息片段組裝成一個完整的消息。這種模式適用的場合為,WebSphere MQ 和應用程序兩者都不方便處理這麼大的單個消息。

一般在發送方程序實現中,我們是把所有的消息片段放在一個同步點中發送,所以需要設置 MQPutMessageOptions 為 MQPMO_SYNCPOINT;同時,我們推薦使用選項 MQPMO_LOGICAL_ORDER,這意味著隊列管理器自動維護每個消息片段的偏移量(Offset),否則,需要應用程序自身來設置:

MQPutMessageOptions pmo = new MQPutMessageOptions ();

pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;

對於每一個消息片段,我們還應標識這是一個消息片段(MQMF_SEGMENT):

myMsg.messageFlags = MQC.MQMF_SEGMENT;

對於最後一個消息片段,也需要設置特殊標識(MQMF_LAST_SEGMENT):

myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;

同樣的,在接收方程序中,我們也是把所有的消息片段放在一個同步點中接收,所以需要設置 MQGetMessageOptions 為 MQGMO_SYNCPOINT;同時,我們也設置 MQGMO_LOGICAL_ORDER 來保證所有的消息片段是按邏輯順序被取出;另外,我們還需設置所有的消息片段都到達後才處理的選項(MQGMO_ALL_SEGMENTS_AVAILABLE),這是為了防止萬一由於異常導致消息片段丟失而引起程序無限等待的情形:

MQGetMessageOptions gmo = new MQGetMessageOptions ();

gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;

由於我們是按邏輯順序來取消息片段的,所以設置循環取消息的時候,只要遇到某一個消息片段是最後一個的標識,我們就認為已經取到了完整的消息。如果沒有設置按照邏輯順序來取消息片段,則需要應用程序根據消息序列號、偏移量、是否是最後一個消息片段等標識來判斷是否已經取到完整的消息。

應用程序實現消息分片的部分代碼如清單 2,您可以下載詳細的示例代碼:

清單 2 應用程序實現消息分片

AppSegSender.java 
   int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   for(int i=0;i<3;i++)
   {
     MQMessage myMsg = new MQMessage ();
     MQPutMessageOptions pmo = new MQPutMessageOptions ();
     pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
     if (i<2)
       myMsg.messageFlags = MQC.MQMF_SEGMENT;
     else
       myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;
     String strMsg = "Hello" + i;
     myMsg.write(strMsg.getBytes());
     myQueue.put(myMsg,pmo);
     System.out.println("Put message '" + strMsg + "'! ");
   }
   myQMgr.commit();
   myQueue.close();
   myQMgr.disconnect();
AppSegReceiver.java 
   int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   MQMessage myMsg;
   MQGetMessageOptions gmo = new MQGetMessageOptions ();
   gmo.options =
   MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
   String strMsg = "";
   boolean isLastSegment = false;
   while(!isLastSegment)
   {
     myMsg = new MQMessage ();
     myQueue.get(myMsg, gmo);
     if (myMsg.messageFlags == MQC.MQMF_SEGMENT + MQC.MQMF_LAST_SEGMENT)
       isLastSegment = true;
     byte[] b = new byte[myMsg.getMessageLength()];
     myMsg.readFully(b);
     strMsg += new String(b);
   }
   System.out.println("Got message:\n" + strMsg);
   myQMgr.commit();
   myQueue.close();
   myQMgr.disconnect();

程序功能介紹:

AppSegSender 程序是使用一個 for 循環,構造一個完整消息的三個消息片段,分別寫入隊列 TESTQ 中。

AppSegReceiver 程序是從隊列 TESTQ 中循環讀取消息片段,根據其邏輯順序以及是否是最後一個消息片段來組裝完整的消息。

相對於隊列管理器器自動實現消息分片的方式,應用程序實現消息分片略顯復雜,但是它能夠處理更大的消息。

消息分組

從實現手段上來講,消息分組和消息分片非常類似,但二者有著完全不同的業務意義。在消息分片中,雖然每一個消息片段都作為一個獨立的消息進行傳輸,但只有收集到所有的消息片段組成一個完整的消息之後才有業務意義,單獨的一個消息片段是沒有任何業務意義的。從這一點上講,我們是由於技術上處理大消息有困難,才想到把大消息進行切分的。而消息分組則不同,它的每一個成員消息都是一個具有業務意義的獨立消息,只是由於某些需要,比如,組內消息有明確的先後順序,等等,才把這批消息作為一組進行傳輸。

在實際實現中,組內的消息是通過 MQMD 中 GroupId 和 MsgSeqNumber 2個屬性來標識,而最後一個消息則會標記這是組內的最後一個消息(MQMF_LAST_MSG_IN_GROUP)。

與消息分片類似,一般在發送方程序中,我們是把同一組的所有消息放在一個同步點中發送,所以需要設置 MQPutMessageOptions 為 MQPMO_SYNCPOINT;同時,我們推薦使用選項 MQPMO_LOGICAL_ORDER,這意味著隊列管理器自動維護每個消息的序列號(MsgSeqNumber),否則,需要應用程序自身來設置:

MQPutMessageOptions pmo = new MQPutMessageOptions ();

pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;

對於每一個消息,我們還應標識這是一個組內的消息(MQMF_MSG_IN_GROUP):

myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;

對於組內的最後一個消息,也需要設置特殊標識(MQMF_LAST_MSG_IN_GROUP):

myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;

同樣的,在接收方程序中,我們也是把同一組的所有消息放在一個同步點中接收,所以需要設置 MQGetMessageOptions 為 MQGMO_SYNCPOINT;同時,我們也設置 MQGMO_LOGICAL_ORDER 來保證同一個組裡的所有消息是按邏輯順序被取出;另外,我們還需設置同一組所有的消息都到達後才處理的選項(MQGMO_ALL_MSGS_AVAILABLE),這是為了防止萬一由於異常導致某一成員消息丟失而引起程序無限等待的情形:

MQGetMessageOptions gmo = new MQGetMessageOptions ();

gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;

由於我們是按邏輯順序來取組內成員消息的,所以設置循環取消息的時候,只要遇到某一個消息是組內最後一個的標識,我們就認為已經取到了該組所有的消息。如果沒有設置按照邏輯順序來取消息片段,則需要應用程序根據消息序列號、取到的消息個數、是否是組內最後一個消息等標識來判斷是否已經取到該組所有的消息。

部分代碼如清單 3,您可以下載詳細的示例代碼。

清單 3 消息分組

AppGrpSender.java 
   int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   for(int i=0;i<3;i++)
   {
     MQMessage myMsg = new MQMessage ();
     MQPutMessageOptions pmo = new MQPutMessageOptions ();
     pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
     if (i<2)
       myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;
     else
       myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
     String strMsg = "Hello" + i;
     myMsg.write(strMsg.getBytes());
     myQueue.put(myMsg,pmo);
     System.out.println("Put message" + (i+1) + " '" + strMsg + "'! ");
   }
   myQMgr.commit();
   myQueue.close();
   myQMgr.disconnect();
AppGrpReceiver.java 
   int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
   myQMgr = new MQQueueManager ("QM1");
   myQueue = myQMgr.accessQueue("TESTQ", openOptions);
   MQMessage myMsg;
   MQGetMessageOptions gmo = new MQGetMessageOptions ();
   gmo.options =
   MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;
   String strMsg = "";
   boolean isLastMsg = false;
   int seq = 0;
   while(!isLastMsg)
   {
     seq++;
     myMsg = new MQMessage ();
     myQueue.get(myMsg, gmo);
     if (myMsg.messageFlags == MQC.MQMF_MSG_IN_GROUP + MQC.MQMF_LAST_MSG_IN_GROUP)
       isLastMsg = true;
     byte[] b = new byte[myMsg.getMessageLength()];
     myMsg.readFully(b);
     strMsg = new String(b);
     System.out.println("Got message" + seq + ":\n" + strMsg);
   }
   myQMgr.commit();
   myQueue.close();
   myQMgr.disconnect();

程序功能介紹:

AppGrpSender 程序是使用一個 for 循環,構造一個組的三個消息,分別寫入隊列 TESTQ 中。

AppGrpReceiver 程序是從隊列 TESTQ 中循環讀取消息,根據其邏輯順序以及是否是組內最後一個消息來判斷是否已取完同一組內的所有消息。

相對於消息分片,消息分組不僅僅是處理大消息的一種方法,更為重要的是,消息分組還能維護一組業務數據中的邏輯關系。

結束語

消息分片和消息分組是在 WebSphere MQ 的編程中處理大消息的常用手段,到底采用哪種方式比較合適,需要根據實際的需求而定。如果大消息需要分割成有實際業務意義的一批小消息,那麼采用消息分組比較合適;反之,如果大消息無法分割成有實際業務意義的小消息,那麼就采用消息分片。甚至在某些復雜的場合下,消息分片和消息分組可以結合起來使用,比如,某批消息傳輸時由於有先後順序的要求,被歸並到一個組內,同時由於部分消息比較大,又需要分片傳輸,有興趣的讀者可以自己來實現一下這個復雜的場景。

本文配套源碼

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved