程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 如何獲取消息?使用JMS技術作為數據復制的解決方案

如何獲取消息?使用JMS技術作為數據復制的解決方案

編輯:關於JAVA

背景

在思考消息傳遞解決方案時,您可能會想到一個通過遠程消息調用機制來集成兩個不同應用程序的系統。一般來講,對於不常通信的分布式實體以及數據傳輸量不是很多這樣的情況,常常使用這種耦合。較經典的示例是,連接到異構後端和入口的同構接口,這些後端和入口指派進行用戶請求的後端處理,然後為最終用戶表示而對那些請求進行重新格式化。

消息傳遞方法中的公共線程一直有這樣的假定:雖然消息傳遞解決方案在系統之間提供健壯、高度可用的通信,但它基本上效率很低,只用來作為在無法避免與外部系統通信時的最後一種手段。在出現遠程方法調用(RMC)時關於消息傳遞的這種觀點就開始流行一直到出現了更現代的象 CORBA 和 DCOM 那樣的消息傳遞解決方案,而且,通常所應用的消息傳遞只局限於解決幾類問題。

目標

在過去的十年中,人們對分布式系統需求有了更深入的理解。新興技術(象 Java 和 .NET)已經包含了代碼分布來作為它們基本編程模型的一部分。通過這樣做,這些技術已將高度可用性和容錯性融入到消息傳遞中,同時鼓勵那些提供解決方案的供應商交付一些系統,這些系統在更廣范圍的問題上考慮性能。

近來我們公司被要求實現文件分布和復制的解決方案,在以前這樣的方案需要集成安全的 FTP、數據庫復制和其它一次性解決方案的定制系統。我們沒有一味地埋頭按照定制開發的道路前進,而是研究了將最新的消息傳遞解決方案應用到這個問題的可能性。我們發現 JMS 不僅為信息傳送提供必要的基礎結構,而且它還能處理我們客戶要求的、與服務質量、安全性、可靠性和性能有關的所有基礎結構問題。本文描述了我們團隊面臨的挑戰,以及 JMS(以 MQSeries 的形式)如何讓我們滿足並超越客戶的要求。

問題

我們的客戶面臨一個重大的分布式數據難題,在全國范圍內有許多呼叫中心,在全國各地的呼叫中心裡接線員要記錄與客戶之間的交互。必須快速可靠地在遠程數據中心為這些記錄建立索引並存檔。建立索引和存檔的存儲過程不能影響接線員的系統記錄和存儲接線員正在與客戶交互的信息的能力。該客戶已經有了一個包含組合起來的代碼、VPN 和其它技術的系統。但是,現有的解決方案遠遠達不到性能和可靠性上的目標,並且它是一種拙劣的技術,難以理解並且維護費用很高。

在開發替代客戶原有系統時,我們考慮了 JMS 和多種非 JMS 的解決方案,尤其是那些基於 FTP 和安全復制(SCP)的解決方案。然而,非 JMS 解決方案有兩個主要缺點:

它們對於安全性方面的缺陷一籌莫展。

它們提供的基礎結構只適用於實際的數據傳送,而對於處理可靠性、容錯性、安全性、平台獨立性以及性能優化等問題,需要定制開發來解決。

我們團隊最後得出結論,對於添加這些額外的特性所需的開發工作是讓人望而卻步的,因此我們決定選用 JMS 解決方案,它可以擺脫這些問題。

解決方案

我們開發了一個基於 JMS 的系統,它:

為已記錄的多媒體文件提供可靠存檔

支持可擴展性,可以使多個數據中心接收文件

支持對其它數據類型進行存檔

我們這裡正討論的文件比以前那些涉及消息傳遞解決方案的項目中傳送的數據還要大(50K - 500K)。我們第一個任務是確保數據大小不會影響 JMS 解決方案。通過測試系統傳遞各種大小的消息有效負載時的性能,我們評估了包括 IBM MQSeries 在內的許多 JMS 解決方案。結果顯示:經過適當配置,大小達到 1 兆的消息不會對整個系統性能產生顯著影響。因為常識認為消息傳遞解決方案只適用於定期的、小的有效負載,所以我們的結果是一個重大發現。我們繼續分析系統的體系結構(圖 1 中概述了此體系結構),它可以提供客戶需要的安全性、高可用性和可靠性。

圖 1. 高級系統體系結構

現有的基礎結構在每個客戶機上有一個系統,當接線員與用戶之間進行交互時,它創建多媒體文件,以此作為響應。此外,還需對這些文件進行存檔。我們的系統啟動一個進程(運行在每個機器上)並在已知目錄中查找這些文件。當檢測到新文件時,進程將它們打包成 JMS 有效負載並發送到其中一個數據中心的 JMS 服務器以便傳遞。一旦 JMS 服務器確認收到,則除去發送方中的這些文件。JMS 服務器將該數據傳送到數據中心內的一個可用處理程序上,進行存檔。

主要概念

JMS 是特定於 Java 的消息傳遞和排隊的實現。在消息傳遞和排隊中有兩個基本思想:

系統通過使用不連續的數據包進行通信,這些數據包都有一個有效負載(即要傳送的信息)和屬性(即該信息的特征以及它應如何通信)。這個數據包稱為 消息。

消息不是被發送給系統,而是被發送到一個獨立的保存區域。可以根據您的需要確定保存區域的數量,通過唯一的名稱,可以標識並定位它們。每個保存區域都可以接收消息,並且根據配置的不同,該區域將每個消息要麼傳遞給所有感興趣的系統(發布-訂閱),要麼傳遞給第一個感興趣的系統(點對點)。這個保存區域稱為 目的地。

我們構建的系統采用點對點的目的地,在 JMS 中稱為隊列。排隊是圖 1 中顯示的系統設計的一個重要方面。該圖顯示了消息正從 JMS 代理直接傳送到接收方的客戶機上,但這並不十分准確。實際上,消息被傳送到一個隊列中,接收方客戶機從隊列中檢索它們。稍後我們研究實現細節時,這個區別將變得非常重要,因為它讓系統並行地處理收到的消息。

跨平台和交叉供應商

對我們客戶機來說盡量減少對某家供應商的依賴,這意味著,我們所設計的代碼應該使由於更改了 JMS 供應商而帶來的影響降至最低,這是十分重要的。JMS 的一個主要優點是它以廣泛的業界支持和開放標准為基礎,因此有了正確設計的代碼,我們就可以讓系統使用任何 JMS 系統。(可以對現有系統進行直接改進,專門設計來使系統在某套硬件上運行並能與特定於供應商的解決方案相匹配。)

通過將所有特定於供應商的調用封裝在稱為 JMSProvider 的類中,就可以輕松實現平台獨立性。這些 Provider 類處理特定於供應商的問題,例如工廠查詢、錯誤處理、連接創建和消息特性設置等。請參閱下面清單 1 中的示例代碼。

清單 1. 在類 ar.jms.JmsProvider 中

public QueueConnection createConnection() throws JMSException {
    return getConnectionFactory().createQueueConnection(getUserName(),
      getPassword());
}

通過利用“Java 命名和目錄接口(JNDI)”,我們將特定於供應商的設置存儲在一個資源庫(例如,LDAP 庫)中,這樣實際代碼就幾乎不需要特定於供應商的引用。只需要少量特定於供應商的代碼來處理一些特性,但是可以將這樣的代碼限定於一些“適配器”類,並將它保存在應用程序代碼之外。請參閱下面清單 2 中的示例代碼。因為 JMS 被設計用來方便地使用 JNDI,所以與其它解決方案相比,這是另一個直接優點 ― 配置信息的集中存儲不僅可以保存基於文本的信息,而且還可以存儲已配置的對象。

清單 2. 在類 ar.jms.JmsProvider 中

public final static String
  CONNECTION_FACTORY_LOOKUP_NAME_KEY = "CONNECTION_FACTORY_LOOKUP_NAME";
public final static
  String FILE_TRANSFER_QUEUE_LOOKUP_NAME_KEY =
  "FILE_TRANSFER_QUEUE_LOOKUP_NAME";
public final static String
  JMS_PROVIDER_CLASS_KEY = "JMS_PROVIDER_CLASS";

public void init() throws NamingException {
   InitialContext jndi = createInitialContext();
   initConnectionFactory(jndi);
   initFileTransferQueue(jndi);
}

public QueueConnection createConnection() throws JMSException {return
    getConnectionFactory().createQueueConnection(getUserName(),
      getPassword());
}

public void initConnectionFactory(InitialContext jndi) throws
    NamingException {
      setConnectionFactory((QueueConnectionFactory)jndi.lookup
         (getProperties().getProperty(CONNECTION_FACTORY_LOOKUP_NAME_KEY)));
}

public void initFileTransferQueue(InitialContext jndi) throws
    NamingException {
      setFileTransferQueue((Queue) jndi.lookup
         (getProperties().getProperty(FILE_TRANSFER_QUEUE_LOOKUP_NAME_KEY)));
}

跳出傳統模式,JMS 解決方案允許以可靠的方式傳送消息,即一旦確認已將消息傳送到 JMS 服務器,就將它傳送至尋址到的目的地(隊列)。MQSeries 也不例外。一旦成功執行了將消息發送到服務器的代碼,客戶機就保證目的地最終會接收到消息,即使所討論的服務器在處理過程中出現故障(如果目的地暫時不可用,或者 JMS 服務器死機等等)。請參閱下面清單 3 中的示例代碼。下面代碼中的類實際上負責一旦它確定需要發送文件,就執行數據的發送。

通過將消息配置為持久消息,我們可以保證一旦目的地(隊列)接收到消息,那麼消息將保留在那裡直到它在該隊列中被檢索為止 ― 即使在系統有故障期間。因此,一旦安全地將消息傳送到本地 JMS 服務器,就可以刪除它了。不能過高估計克服系統故障的能力;對周期性系統故障的處理是開發分布式存檔解決方案最重要的問題之一。客戶現有系統上處理故障情況的代碼很復雜很脆弱,而且對這些故障的處理和維護費用很高。通過一個健壯的、經測試成功的商業解決方案,JMS 使我們能解決所有這些問題。

>清單 3. 來自類 ar.jms.file.send.ConnectionElement

public void sendMessage(byte[] payload, boolean persistent) throws
    SendFailedException {
    QueueSender sender = null;
    try {
    Message message = createMessage(payload);
    sender = createSender 
    (persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
    sender.send(message);
      getClient().getLogService().logInfo(getName() +
      " sent message " + message.getJMSMessageID() + ".");
    } catch (JMSException exception) {
      getClient().getLogService().logError 
      ("JMS exception processing " + getName(),exception);
      stop();
      throw new SendFailedException("JMS Message Send Failed");
   }
    try {
      sender.close();
    } catch (JMSException ignore) {
      getClient().getLogService().logInfo(getName() + " failed to
      close sender. Processing will continue.");
   }
}

這個解決方案的關鍵是配置 JMS 消息和服務器來同時提供令人滿意的性能和服務質量。JMS 規范定義了配置選項,並通過所有商業解決方案實現它們。但是,配置的確切方法根據不同的供應商而有所不同。

設置

我們創建的體系結構和系統具有通用性且很強大。但是,對於一些移動部件,必須使用正確的方式配置並鉤連它們。以下內容是有關將 MQSeries 成功地設置為 JMS 服務器的概述、一些潛在缺陷和實際的指示信息。

對於 MQSeries,首先設置 JNDI 服務器來檢索特定於實現的設置,在這種情況下是 JMS 連接工廠(JMS Connection Factory)。有許多不同方法來實現這個操作,但適宜的通用選項是輕量級目錄訪問協議(LDAP)服務器。我們選擇使用 Qualcomm SLAPD。一旦安裝好並運行該服務器,就可以用 MQSeries 管理工具(JMSAdmin.bat)來設置該服務器並將其作為 MQ 對象信息庫來使用。同時,在設置期間,要特別注意在 IBM MQSeries 之上設置 JMS 的 IBM 文檔,這很重要。這個過程涉及創建一些隊列和其它對象,這些隊列和對象是特定於 JMS 使用並且不屬於標准 MQSeries 安裝的。

完成 JNDI/LDAP 和 JMS 服務器的設置後,就可以准備配置客戶機了。第一步是理解 JMS 如何與 IBM 的標准 MQSeries 實現交互。MQSeries 的 Java 客戶機能以兩種模式之一進行交互:客戶機和綁定模式。只能通過 Java applet 來使用客戶機模式,而綁定模式取決於客戶機上的 DLL 或者對象庫。因為實現的特性,當使用用於 JMS 連接信息的 LDAP 服務器時,只能使用綁定模式。(不清楚為什麼有這個限制,但它確實存在。)因此,將用戶登錄和密碼存儲在一個全局位置(com.ibm.mq.MQEnvironment.class)而不是在連接時傳遞它們。要解決這些供應商問題,我們創建了標准 JmsProvider 類(稱為 MQSeriesProvider)的子類。這個類將完成的唯一操作是覆蓋如何創建連接。不象清單 1 中那樣

newQueueConnection =
getConnectionFactory().createQueueConnection(getUserName(),getPassword));

進行調用,我們必須調用

newQueueConnection = getConnectionFactory().createQueueConnection();

最後,您需要將特定於 JMS 的元素(如隊列、隊列管理器、隊列工廠等等)提供給客戶機。現在,使用 LDAP 和 JNDI 的原因就變得很明顯:我們使用 LDAP 服務器來存儲這些元素並使用外部文件保存那些 LDAP 對象的鍵。LDAP 服務器可以作為 JNDI 服務器並通過返回存儲的對象來響應名稱查詢。這就是清單 2 中代碼的工作原理。JMS 元素的名稱可從類的靜態變量(對於缺省名稱)或者外部文件(使用非缺省的其它名稱)中獲取。簡而言之,向 LDAP 服務器請求鍵(我們正討論的)中存儲的對象,並返回我們感興趣的 JMS 對象(在這種情況下)。

我們基於 JMS 的解決方案通過使用現有的組件更方便地實現統一的、跨平台和交叉供應商的配置環境。現在我們的代碼已盡可能地成為獨立於特定於平台和特定於供應商的設置。

應用程序

應用程序中有兩個關鍵組件:發送器和接收器。發送器啟動一個後台程序,它在目錄中輪詢需要存檔的文件,而接收器只是等待將要傳遞的 JMS 消息,然後將該消息中包含的文件存檔。JMS API 使我們幾乎無需關注正使用的特定 JMS 實現就可以定義這些組件。

發送器由三個主要部件組成:

JMSProvider,用於創建連接

ConnectionPool,用於獲取現有的空閒連接(我們稱之為 JMSConnection)

輪詢程序,監視需要傳送的文件。

在啟動時,使用 JMSProvider 來創建一些到 JMS 服務器的准備就緒的連接。這些連接放置在池中,然後啟動輪詢程序。當輪詢程序檢測到需要傳送文件時,它就創建一個獨立線程來處理這個文件。(可以通過派生(forking),創建一個獨立的線程來創建消息和進行傳送操作,描述該過程非常簡單。但在實際應用中,常常將合用(pooling)與循環組合起來使用,這樣可以確保很少創建新線程,而是重用線程。但是,那個過程相當復雜,過多的說明會分散本文的中心主題 ― JMS。)

在獨立線程中,輪詢程序接著從連接池中獲取 JMSConnection,用它來創建一個 BytesMessage,並將這個文件的二進制內容放入那個消息中。最後這個消息查找到接收器,並發送到 JMS 服務器,接著將 JMSConnection 返回給 ConnectionPool。這個發送過程的部分步驟顯示在下面的圖 2 中。

圖 2. 發送器過程

接收器是一個較簡單的組件;它啟動一些 FileListener 來等待將要放置在接收器隊列中的消息。下面的清單 4 中的代碼顯示了 FileListener 設置處理過程。圖 6 中的類實際上負責從隊列中檢索消息並對它們進行存檔。JMS 保證隊列發送每個消息的次數僅一次,所以我們可以安全啟動許多不同的 FileListener 線程並且知道每個消息(因此每個文件)只處理一次。這個保證是使用基於 JMS 解決方案的另一個重要優點。在自己設計的解決方案中開發這樣的功能(比如基於 FTP 的功能),花銷很大且易出錯。

清單 4:來自類 ar.jms.file.receive.FileListener

public void startOn(Queue queue) {
   setQueue(queue);
   createConnection();
   try {
     createSession();
     createReceiver();
     getConnection().start(); // this starts
     the queue listener 
   } catch (JMSException exception) {
     // Handle the exception
   }
}

public void createReceiver() throws javax.jms.JMSException {
   try {
     QueueReceiver receiver = getSession().
createReceiver(getQueue());
     receiver.setMessageListener(this);
   } catch (JMSException exception) {
     // Handle the exception
   }
}
public void createSession() throws JMSException {
   setSession(getConnection().
createQueueSession(false, Session.AUTO_ACKNOWLEDGE));
}

public void createConnection() {
   while (!hasConnection()) {
     try {
       setConnection(getClient().createConnection());
     } catch (JMSException exception) {
       // Connections drop periodically on the
       internet, log and try again.
       try {
         Thread.sleep(2000);
       } catch
       (java.lang.InterruptedException ignored) {
       }
     }
   }
}

以回調的方式編寫消息處理代碼,回調是當將消息傳遞給 FileListener 時,JMS 自動調用的方法。這個消息的代碼顯示在下面的清單 5 中。

清單 5. 來自類 ar.jms.file.receive.FileListener

public void onMessage(Message message) {
   BytesMessage byteMessage = ((BytesMessage) message);
   OutputStream stream =
new BufferedOutputStream(
new FileOutputStream(getFilenameFor(message)));
   byte[] buffer = new byte[getFileBufferSize()];
   int length = 0;
   try {
     while ((length = byteMessage.readBytes(buffer)) != -1) {
       stream.write(buffer, 0, length);
     }
     stream.close();
   } catch (JMSException exception) {
     // Handle the JMSException
   } catch (IOException exception) {
     // Handle the IOException
   }
}

在設置接收器時要記住一條訣竅:在所有 FileListener 啟動後,確保啟動這些 FileListener 的原始線程繼續運行。這是必要的,因為某些 JMS 實現在守護程序的線程中啟動 QueueListener。所以,如果正在運行的唯一線程是守護程序的線程,那麼 Java 虛擬機(JVM)可能會過早地意外退出。下面的清單 6 顯示了一些防止這種情況發生的簡單代碼。

清單 6. 至少使一個非守護程序的線程保持運行 public static void main(String[] args) {
   ReceiverClient newReceiverClient = new ReceiverClient();
   newReceiverClient.init();
   setSoleInstance(newReceiverClient);
   while(!finished) {  // This prevents the VM from exiting early
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ex) {
    }
   }
}

結束語

在該項目的最初實現之後,我們添加了一些功能,象消息壓縮、當位置無法到達時的自動恢復、聯合消息代理、安全性、健壯的日志記錄、管理等等。添加這些功能很容易,因為 JMS 提供了開放模型,而且我們的體系結構也很健壯。構建整個系統花了六個星期的時間,並且還很快地替換了客戶一直使用的現有的、勞動密集型的系統。在這些天裡,系統已經超出了所有的基准測試程序的標准並且已更正了原來系統遺留下來的錯誤。這個項目不單超出了客戶的期望,還證明了 JMS 是一個可行的解決方案,不僅適用於小型、面向消息的應用程序,而且還適用於大規模的、重要任務的數據傳送操作。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved