JMS客戶端編程模型,jms客戶端模型
概述
客戶端編程模型,是講如何使用JMS API實現Java應用程序和JMS Provider的通信。
在ActiveMQ相關背景的MOM一節,四個基本元素中的客戶端,和這裡的客戶端是同一個概念。
本文已准備好pdf版本供下載:JMS客戶端編程模型.pdf(有目錄)。
消息傳送模式
消息傳送模式又稱為
消息傳送域,JMS API定義了兩種模式:PTP和Pub/Sub。
PTP
全稱:Point-to-Point 中文:點對點
上圖描述了這樣的內容:
Sender發送
Message到
代理維護的
Queue,然後
Receiver可以從這個
Queue中獲取這個
Message。
這個模式的特點是:
Pub/Sub
全稱:Publish/Subscribe 中文:發布/訂閱
上圖描述了這樣的內容:
Publisher發布
Message到
代理維護的
Topic,
Subscribe從
代理那裡訂閱的
Topic,從而可以獲取對應的
Message。
這個模式的特點是:
角色定位
MOM包含四個基本元素:消息傳遞提供者、目的地、客戶端(發送方或接收方)、消息。上述兩種模式在MOM中的角色定位,可以表現為:
JMS API
JavaEE提供的javax.jms包中,涵蓋了JMS的API。
統一域和特定於域的API
早期的規范中,PTP和Pub/Sub各有不同的接口體系,後來的JMS1.1在兩個接口體系的上層,又定義了一層統一的接口體系,稱為統一域的API;而之前的兩套接口體系,稱為特定於域的API。對比情況如下:
注意:JMS當然不是只提供了上述的接口。上述接口,只是在統一之後,有對比意義的接口。
編程模型
說明:連接工廠、目的地通常作為受管理對象來創建、配置、管理,駐留在對象存儲庫中;客戶端程序通過JNDI查找獲取對象,而不建議顯示的創建。所以連接工廠、目的地在上圖的表示有所不同。
對象簡介
- 連接工廠(ConnectionFactory)
客戶端使用連接工廠對象(ConnectionFactory)創建連接。
- 連接(Connection)
連接對象 (Connection) 表示客戶端與代理之間的活動連接。創建連接時會分配通信資源並對客戶端進行驗證。這是一個相當重要的對象,大多數客戶端均使用一個連接來完成所有的消息傳送。連接支持並發使用:一個連接可由任意數量的生成方和使用方共享。
- 會話(Session)
如果連接代表客戶端與代理之間的通信渠道,則會話標記客戶端與代理之間的單次對話。會話對象主要用於創建消息、消息生成方和消息使用方。
- 消息(Message)
消息封裝一次通信的內容。消息由三部分組成:消息頭、屬性和主體。
- 消息生成方(MessageProducer)
由Session創建,負責發送Message到目的地。
- 消息使用方(MessageConsumer)
由Session創建,負責從目的地中消費Message。
- 目的地(Destination)
JMS Provider負責維護,用於對Message進行管理的對象。MessageProducer需要指定Destination才能發送消息,MessageReceiver需要指定Destination才能接收消息。
demo
將上文的客戶端編程模型應用起來。兩個客戶端:producer-client負責發送消息到ActiveMQ;consumer-client負責從ActiveMQ中獲取消息。
在此之前,你需要下載ActiveMQ,並確保成功啟動。
下載:不同的版本對JDK版本的要求不同,當前最新穩定版本是5.13.2,要求JDK1.7以上。
安裝:直接解壓縮
啟動:(1) 在cmd窗口,切換到解壓縮的路徑;(2) bin\activemq start
確認:ActiveMQ提供了監控器,訪問http://localhost:8161/admin,默認賬號:admin/admin。可以訪問,表示啟動成功。
1.producer-client
基於Maven的simple project。你可以在jms-producer拿到源代碼。
文件目錄結構
pom.xml
src/main/resources/
|---- jndi.properties
src/main/java/
|---- cn.sinobest.asj.producer.jms.clientmode
|---- SimpleProducer.java # 基於客戶端編程模型,發送消息給ActiveMQ
文件內容
1.pom.xml
1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <groupId>cn.sinobest.asj</groupId>
6 <artifactId>jms-producer</artifactId>
7 <version>0.0.1-SNAPSHOT</version>
8 <name>jms-producer</name>
9 <description>基於ActiveMQ的Producer Client。</description>
10 <dependencies>
11 <!-- import activemq-client to send message to ActiveMQ -->
12 <dependency>
13 <groupId>org.apache.activemq</groupId>
14 <artifactId>activemq-client</artifactId>
15 <version>5.13.2</version>
16 </dependency>
17 <!-- not necessary, import to remove the warn message from activemq-client -->
18 <dependency>
19 <groupId>org.slf4j</groupId>
20 <artifactId>slf4j-simple</artifactId>
21 <version>1.7.19</version>
22 </dependency>
23 </dependencies>
24 </project>
2.jndi.properties
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
2
3 # use the following property to configure the default connector
4 java.naming.provider.url=tcp://localhost:61616
5
6 # register some queues in JNDI using the form
7 # queue.[jndiName] = [physicalName]
8 queue.exampleQueue=example.queue
9
10 # register some topics in JNDI using the form
11 # topic.[jndiName] = [physicalName]
12 topic.exampleTopic = example.topic
說明:
- java.naming.factory.initial
定義ConnectionFactory的實例化類。
- java.naming.provider.url
定義broker的url,這個是ActiveMQ默認的url。
- queue.exampleQueue
定義了一個Queue Destination。name為example.queue,JNDI的name為exampleQueue。
- topic.exampleTopic
定義了一個Topic Destination。name為example.topic,JNDI的name為exampleTopic。
3.SimpleProcedure.java
1 package cn.sinobest.asj.producer.jms.clientmode;
2 import javax.jms.Connection;
3 import javax.jms.ConnectionFactory;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.MessageProducer;
7 import javax.jms.Session;
8 import javax.jms.TextMessage;
9 import javax.naming.Context;
10 import javax.naming.InitialContext;
11 import javax.naming.NamingException;
12 /**
13 * A simple demo for producer client to send message to ActiveMQ.<br>
14 * refer to <a href="http://activemq.apache.org/jndi-support.html">JNDI
15 * Support</a>
16 *
17 * @author lijinlong
18 */
19 public class SimpleProducer {
20 /** JNDI name for ConnectionFactory */
21 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
22 /** JNDI name for Queue Destination (use for PTP Mode) */
23 static final String QUEUE_JNDI_NAME = "exampleQueue";
24 /** JNDI name for Topic Destination (use for Pub/Sub Mode) */
25 static final String TOPIC_JNDI_NAME = "exampleTopic";
26 /**
27 * @param args
28 */
29 public static void main(String[] args) {
30 Context jndiContext = null;
31 ConnectionFactory connectionFactory = null;
32 Connection connection = null;
33 Session session = null;
34 Destination destination = null;
35 MessageProducer producer = null;
36 // create a JNDI API IntialContext object
37 try {
38 jndiContext = new InitialContext();
39 } catch (NamingException e) {
40 System.out.println("Could not create JNDI Context:"
41 + e.getMessage());
42 System.exit(1);
43 }
44
45 // look up ConnectionFactory and Destination
46 try {
47 connectionFactory = (ConnectionFactory) jndiContext
48 .lookup(CONNECTION_FACTORY_JNDI_NAME);
49 // look up QUEUE_JNDI_NAME for PTP Mode
50 // look up TOPIC_JNDI_NAME for Pub/Sub Mode
51 destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME);
52 } catch (NamingException e) {
53 System.out.println("JNDI look up failed:" + e.getMessage());
54 System.exit(1);
55 }
56
57 // send Messages and finally release the resources.
58 try {
59 connection = connectionFactory.createConnection();
60 session = connection.createSession(Boolean.FALSE,
61 Session.AUTO_ACKNOWLEDGE);
62 producer = session.createProducer(destination);
63 TextMessage message = session.createTextMessage();
64 for (int i = 0; i < 3; i++) {
65 message.setText(String.format("This is the %dth message.",
66 i + 1));
67 producer.send(message);
68 }
69 } catch (JMSException e) {
70 e.printStackTrace();
71 } finally {
72 try {
73 if (session != null)
74 session.close();
75 if (connection != null)
76 connection.close();
77 } catch (JMSException e) {
78 e.printStackTrace();
79 }
80 }
81 }
82 }
SimpleProcedure.java
說明:
2.consumer-client
基於Maven的simple project。你可以在jms-consumer拿到源代碼。
文件目錄結構
pom.xml
src/main/resources/
|---- jndi.properties
src/main/java/
|---- cn.sinobest.asj.consumer.jms.clientmode
|---- SimpleConsumer.java # 基於客戶端編程模型,從ActiveMQ接收消息
文件內容
1.pom.xml
1 <project xmlns="http://maven.apache.org/POM/4.0.0"
2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <groupId>cn.sinoebst.asj</groupId>
6 <artifactId>jms-consumer</artifactId>
7 <version>0.0.1-SNAPSHOT</version>
8 <name>jms-consumer</name>
9 <description>基於ActiveMQ的Consumer Client。</description>
10 <dependencies>
11 <!-- import activemq-client to receive message from ActiveMQ -->
12 <dependency>
13 <groupId>org.apache.activemq</groupId>
14 <artifactId>activemq-client</artifactId>
15 <version>5.13.2</version>
16 </dependency>
17 <!-- not necessary, import to remove the warn message from activemq-client -->
18 <dependency>
19 <groupId>org.slf4j</groupId>
20 <artifactId>slf4j-simple</artifactId>
21 <version>1.7.19</version>
22 </dependency>
23 </dependencies>
24 </project>
2.jndi.properties
1 java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
2
3 # use the following property to configure the default connector
4 java.naming.provider.url=tcp://localhost:61616
5
6 # register some queues in JNDI using the form
7 # queue.[jndiName] = [physicalName]
8 queue.exampleQueue=example.queue
9
10 # register some topics in JNDI using the form
11 # topic.[jndiName] = [physicalName]
12 topic.exampleTopic = example.topic
3.SimpleConsumer.java
1 package cn.sinobest.asj.consumer.jms.clientmode;
2 import javax.jms.Connection;
3 import javax.jms.ConnectionFactory;
4 import javax.jms.Destination;
5 import javax.jms.JMSException;
6 import javax.jms.Message;
7 import javax.jms.MessageConsumer;
8 import javax.jms.Session;
9 import javax.jms.TextMessage;
10 import javax.naming.Context;
11 import javax.naming.InitialContext;
12 import javax.naming.NamingException;
13 /**
14 * A simple demo for consumer to receive message from ActiveMQ.<br>
15 *
16 * @author lijinlong
17 *
18 */
19 public class SimpleConsumer {
20 /** JNDI name for ConnectionFactory */
21 static final String CONNECTION_FACTORY_JNDI_NAME = "ConnectionFactory";
22 /** JNDI name for Queue Destination (use for PTP Mode) */
23 static final String QUEUE_JNDI_NAME = "exampleQueue";
24 /** JNDI name for Topic Destination (use for Pub/Sub Mode) */
25 static final String TOPIC_JNDI_NAME = "exampleTopic";
26 /**
27 * @param args
28 */
29 public static void main(String[] args) {
30 Context jndiContext = null;
31 ConnectionFactory connectionFactory = null;
32 Connection connection = null;
33 Session session = null;
34 Destination destination = null;
35 MessageConsumer consumer = null;
36 // create a JNDI API IntialContext object
37 try {
38 jndiContext = new InitialContext();
39 } catch (NamingException e) {
40 System.out.println("Could not create JNDI Context:"
41 + e.getMessage());
42 System.exit(1);
43 }
44 // look up ConnectionFactory and Destination
45 try {
46 connectionFactory = (ConnectionFactory) jndiContext
47 .lookup(CONNECTION_FACTORY_JNDI_NAME);
48 // look up QUEUE_JNDI_NAME for PTP Mode
49 // look up TOPIC_JNDI_NAME for Pub/Sub Mode
50 destination = (Destination) jndiContext.lookup(QUEUE_JNDI_NAME);
51 } catch (NamingException e) {
52 System.out.println("JNDI look up failed:" + e.getMessage());
53 System.exit(1);
54 }
55 // receive Messages and finally release the resources.
56 try {
57 connection = connectionFactory.createConnection();
58 connection.start(); // connection should be called in
59 // receiver-client
60 session = connection.createSession(Boolean.FALSE,
61 Session.AUTO_ACKNOWLEDGE);
62 consumer = session.createConsumer(destination);
63 long timeout = 10 * 1000;
64 for (Message message = consumer.receive(timeout); message != null; message = consumer
65 .receive(timeout)) {
66 String text = ((TextMessage) message).getText();
67 System.out.println(String.format("receive a message:%s", text));
68 }
69 } catch (JMSException e) {
70 e.printStackTrace();
71 } finally {
72 try {
73 if (session != null)
74 session.close();
75 if (connection != null)
76 connection.close();
77 } catch (JMSException e) {
78 e.printStackTrace();
79 }
80 }
81 }
82 }
SimpleConsumer.java
說明:在第65行調用了Connection#start方法,否則無法收到消息(目前不明白原因)。
3.測試
3.1.基於PTP Mode測試
3.2.基於Pub/Sub測試
4.編程獲取ConnectionnFactory和Destination
雖然不建議使用編程的方式獲取ConnectionFactory和Destination,但是還是記錄一下:
1 // 創建工廠實例
2 // javax.jms.ConnectionFactory
3 // org.apache.activemq.ActiveMQConnectionFactory
4 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
5 ActiveMQConnectionFactory.DEFAULT_USER,
6 ActiveMQConnectionFactory.DEFAULT_PASSWORD,
7 ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
8 // javax.jms.Connection
9 Connection connection = connectionFactory.createConnection();
10 // javax.jms.Session
11 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
12 // 創建目的地
13 // javax.jms.Destination
14 Destination destination = session.createQueue("example.queue");
15 Destination destination = session.createTopic("example.topic");
附錄
參考
來自為知筆記(Wiz)