在不同系統之間交換信息的一大障礙是如何在精確交換和格式化數據方面取得一致。Java Message Service( Java消息服務,簡稱JMS)通過提供一種與J2EE應用程序或傳統系統交互的方法部分的解決了這個問題。
JMS的通用接口集合以異步方式發送或接收消息。異步方式接收消息顯然是使用間斷網絡連接的客戶機,諸如移動電話和PDA的最好的選擇。另外, JMS采用一種寬松結合方式整合企業系統的方法,其主要的目的就是創建能夠使用跨平台數據信息的、可移植的企業級應用程序,而把開發人力解放出來。
Java消息服務支持兩種消息模型:Point-to-Point消息(P2P)和發布訂閱消息(Publish Subscribe messaging,簡稱Pub/Sub)。JMS規范並不要求供應商同時支持這兩種消息模型,但開發者應該熟悉這兩種消息模型的優勢與缺點。
P2P消息模型是在點對點之間傳遞消息時使用。如果應用程序開發者希望每一條消息都能夠被處理,那麼應該使用P2P消息模型。與Pub/Sub消息模型不同,P2P消息總是能夠被傳送到指定的位置。
Pub/Sub模型在一到多的消息廣播時使用。如果一定程度的消息傳遞的不可靠性可以被接受的話,那麼應用程序開發者也可以使用Pub/Sub消息模型。換句話說,它適用於所有的消息消費程序並不要求能夠收到所有的信息或者消息消費程序並不想接收到任何消息的情況。
JMS通過允許創建持久訂閱來簡化時間相關性,即使消息預訂者未激活也可以接收到消息。此外,使用持久訂閱還可通過隊列提供靈活性和可靠性,而仍然允許消息被發給許多的接收者。
Topic Subscriber topic Subscriber =
topicSession.createDurableSubscriber(topic, subscriptionName);
Connection對象表示了到兩種消息模型中的任一種的消息系統的連接。服務器端和客戶機端對象要求管理創建的JMS連接的狀態。連接是由Connection Factory創建的並且通過JNDI查尋定位。
//取得用於 P2P的 QueueConnectionFactory
QueueConnectionFactory = queueConnectionFactory( );
Context messaging = new InitialContext( );
QueueConnectionFactory = (QueueConnectionFactory)
Messaging.lookup(“QueueConnectionFactory”);
//取得用於 pub/sub的 TopicConnectionFactory
TopicConnectonFactory topicConnectionFactory;
Context messaging = new InitialContext();
topicConnectionFactory = (TopicConnectionFactory)
messaging.lookup(“TopicConnectionFactory”);
注意:用於P2P的代碼和用於PublishSubscribe的代碼非常相似。
如果session被標記為transactional的話,確認消息就通過確認和校正來自動地處理。如果session沒有標記為 transactional,你有三個用於消息確認的選項。
· AUTO_ACKNOWLEDGE session將自動地確認收到一則消息。
· CLIENT_ACKNOWLEDGE 客戶端程序將確認收到一則消息,調用這則消息的確認方法。
· DUPS_OK_ACKNOWLEDGE 這個選項命令session“懶散的”確認消息傳遞,可以想到,這將導致消息提供者傳遞的一些復制消息可能會出錯。這種確認的方式只應當用於消息消費程序可以容忍潛在的副本消息存在的情況。
queueSession = queueConnection.createQueueSession(false, session.AUTO_ACKNOWLEDGE);//P2P
topicSession = topicConnection.createTopicSession(false, session.AUTO_ACKNOWLEDGE); //Pub-Sub
注意:在本例中,一個session目的從連結中創建,非值指出session是non-transactional的,並且 session將自動地確認收到一則消息。
JMS現在有兩種傳遞消息的方式。標記為NON_PERSISTENT的消息最多投遞一次,而標記為PERSISTENT的消息將使用暫存後再轉送的機理投遞。如果一個JMS服務離線,那麼持久性消息不會丟失但是得等到這個服務恢復聯機時才會被傳遞。所以默認的消息傳遞方式是非持久性的。即使使用非持久性消息可能降低內務和需要的存儲器,並且這種傳遞方式只有當你不需要接收所有的消息時才使用。
雖然 JMS規范並不需要JMS供應商實現消息的優先級路線,但是它需要遞送加快的消息優先於普通級別的消息。JMS定義了從0到9的優先級路線級別,0是最低的優先級而9則是最高的。更特殊的是0到4是正常優先級的變化幅度,而5到9是加快的優先級的變化幅度。舉例來說:
topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8, 10000); //Pub-Sub
或
queueSender.send(message, DeliveryMode.PERSISTENT, 8, 10000);//P2P
這個代碼片斷,有兩種消息模型,映射遞送方式是持久的,優先級為加快型,生存周期是10000 (以毫秒度量 )。如果生存周期設置為零,這則消息將永遠不會過期。當消息需要時間限制否則將使其無效時,設置生存周期是有用的。
JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。
· StreamMessage -- Java原始值的數據流
· MapMessage--一套名稱-值對
· TextMessage--一個字符串對象
· ObjectMessage--一個序列化的 Java對象
· BytesMessage--一個未解釋字節的數據流
JMS應用程序接口提供用於創建每種類型消息和設置荷載的方法例如,為了在一個隊列創建並發送一個TextMessage實例,你可以使用下列語句:
TextMessage message = queueSession.createTextMessage(); message.setText(textMsg);
以異步方式接收消息,需要創建一個消息監聽器然後注冊一個或多個使用MessageConsumer的JMS MessageListener接口。會話(主題或隊列)負責產生某些消息,這些消息被傳送到使用onMessage方法的監聽者那裡。
import javax.jms.*;
public class ExampleListener implements MessageListener {
//把消息強制轉化為TextMessage格式
public void onMessage(Message message) {
TextMessage textMsg = null;
// 打開並處理這段消息
}
}
當我們創建QueueReceiver和TopicSubscriber時,我們傳遞消息選擇器字符串:
//P2P QueueReceiver
QueueReceiver receiver;
receiver = session.createReceiver(queue, selector);
//Pub-Sub TopicSubscriber
TopicSubscriber subscriber;
subscriber = session.createSubscriber(topic, selector);
為了啟動消息的交付,不論是Pub/Sub還是P2P,都需要調用start方法。
TopicConnection.start( ); //pub-sub
QueueConnection.start( ); //P2P
TopicConnection.start ( );// pub-sub
QueueConnection.start ( );// P2P
當一條消息被捕捉時,這條消息做為一條必須被強制轉化為適當消息類型的普通Message對象到達。這是一個被用來提取或打開消息內容的getter方法。下列代碼片段使用StreamMessage類型。
private void unPackMessage (Message message) {
String eName;
String position;
double rate;
StreamMessage message;
Message = session.createStreamMessage( );
//注意下面的代碼必須按照我給出的順序書寫
message.writeString(eName);
message.writeString(position);
message.writeDouble(rate);
//實現處理消息的必要的程序邏輯
}
停止消息的傳遞,無論是Pub/Sub還是P2P,都調用stop方法。
TopicConnection.start( ); //pub-sub
QueueConnection.start( ); //P2P
TopicConnection.start ( );// pub-sub
QueueConnection.start ( );// P2P
其他的J2EE組件--servlet或EJB--可以當作消息生產者;然而,它們可能只能同步操作,這可能是因為它們的請求-應答的性質決定的。雖然XML目前還不是被支持的消息類型,發送一個XML文件和創建一條文本類型消息以及把XML文件添加到消息的有效負載都一樣簡單,都是以非專有的方式傳送數據。值得注意的是,一些JMS供應廠商已經提供了可用的XML消息類型。但是使用非標准的消息類型可能會出現可移植性問題。
String reportData; //reportData內容為XML 文檔
TextMessage message;
message = session.createTextMessage();
message.setText (reportData);
消息驅動組件(MDB)是一個當消息到達時被容器調用的異步消息消費程序。和entity和session EJB不同,MDB沒有本地和遠程接口並且是匿名的;它們對於客戶是不可見的。MDB是JMS系統的一部分,作為消費者實現服務器上的商業邏輯程序。 一個客戶程序可能通過使用JNDI定位一個與MDB相關聯的JMS。 例如:
Context initialContext = new InitialContext();
Queue reportInfoQueue = (javax.jms.Queue)initialContext.lookup
(“java:comp/env/jms/reportInfoQueue”);
MDB是由Bean類和相應的XML部署描述符組成。 Bean 類實現MessageDriveBean 接口:
import javax.ejb.*;
import jms.Message.*;
public interface MessageDriveBean {
public void ejbCreate();
public void ejbRemove();
public void setMessageDrivenContext(MessageDrivenContext ctx);
}
消息監聽器接口:
import javax.jms.*;
public interface MessageListener {
public void onMessage( );
}
部署描述符
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/j2ee/dtds/ejb-jar_2_0.dtd">
<ejb-jar>
<enterprise-beans>
<message-driven>
<ejb-name>MDB</ejb-name>
<ejb-class>MDB</ejb-class>
<transaction-type>Container</transaction-type>
<message-driven-destination>
<jms-destination-type>javax.jms.Queue</jms-destination-type>
</message-driven-destination>
<security-identity>
<run-as-specified-identity>
<role-name>everyone</role-name>
</run-as-specified-identity>
</security-identity>
</message-driven>
</enterprise-beans>
</ejb-jar>
既然我們現在已經有了一些基本的JMS知識,那麼我們可以使用JMS做什麼呢?任何事情都可以。
例如,分別用於銷售、庫存、客戶服務和賬目處理的系統。這些部門之間的系統很可能已經存在了很長時間,這些處理要求把事務移動到系統中去,這並不是一個小的工作。這就是消息服務適用的地點。
當售貨員完成銷售的時候,一條消息被發給庫存系統;一旦訂單消息發送給收發貨人員,就可以按照訂單出貨了。當訂單成功地發貨,系統將通知顧客服務和會計系統這個訂單已經成功的交易了。所有對應的每個子系統都自動地根據收到的消息進行更新。
JMS一般都不是用來整合一個系統,而是整合許多可能參與消息驅動環境的系統。JMS是一個用於開發和集成企業應用程序的重要的工具。因為許多公司都有以前遺留下來的系統和新近開發的系統綜合起來的系統,消息的使用是整合整個企業的重要的步驟。