第十一章 企業項目開發--消息隊列activemq,項目開發activemq
注意:本章代碼基於 第十章 企業項目開發--分布式緩存Redis(2)
代碼的github地址:https://github.com/zhaojigang/ssmm0
消息隊列是分布式系統中實現RPC的一種手段。
1、消息隊列的基本使用流程
假設:
- 我們有這樣一個需求,當每注冊一個admin的之後,就寫一條日志log數據到數據庫。
分析:
- 在實際中,我們是不會把日志直接寫入數據庫的,因為日志數據通常是龐大的,而且日志的產生是頻繁的,如果我們使用數據庫存儲日志,哪怕是使用異步存儲,也是極耗性能的。在企業中,對於日志的處理方式很多,比較簡單的一種是,日志直接產生於nginx或後端服務器(eg.resin),我們寫一個定時任務,每隔一段時間,將產生的日志文件使用shell和Python進行正則過濾,取出有用信息,之後進行處理統計,最後將處理後的數據寫入數據庫。
在這裡我們作為演示,,每當注冊一個admin的之後,我們異步寫一條日志log數據到數據庫。
下邊的舉例也是對代碼的解釋。
- server1:部署ssmm0-userManagement
- server2:部署ssmm0-rpcWeb
- server3:部署消息隊列服務器
當server1執行一個"http://localhost:8080/admin/register?username=canglang25&password=1457890"操作,即向數據庫插一條admin信息時,同時將日志log信息寫入server3,之後不會等待log信息被server2消費掉就直接返回(異步);
server2循環接收server3中的消息隊列中的消息,並將這些log消息寫入數據庫。
2、消息隊列的作用
- 異步
- 解耦:server1(消息生產者服務器)和server3(消息消費者服務器)沒有直接聯系
- 削峰填谷:當大量請求湧入應用服務器時,應用服務器如果處理不過來,就將這些請求先放入隊列,之後再從隊列中取出請求慢慢處理(秒殺的一種處理方式)
3、消息隊列的兩種方式
- P2P
- 消息生產者產生的消息只能由一個消息消費者消費
- 基於隊列queue
- 執行流程
- 生產者:創建連接工廠-->創建連接-->啟動連接-->創建session-->創建隊列,創建生產者,創建消息-->發送消息
- 消費者:創建連接工廠-->創建連接-->啟動連接-->創建session-->創建隊列,創建消費者-->接收消息
- 發布-訂閱
- 消息生產者產生的消息可以由所有訂閱了(監聽了)該消息的消費者消費
- 基於主題topic
- 執行流程
- 生產者:創建連接工廠-->創建連接-->啟動連接-->創建session-->創建topic,創建消息發布者,創建消息-->發布消息
- 消費者:創建連接工廠-->創建連接-->啟動連接-->創建session-->創建topic,創建消息訂閱者-->消息訂閱者通過監聽器接收消息
4、實例(基於P2P實現)
4.1、整體代碼結構:

4.2、模塊依賴關系

注:箭頭的指向就是當前模塊所依賴的模塊。(eg.rpcWeb依賴data)
- userManagement:用戶管理模塊--war
- rpcWeb:rpc測試模塊(這裡用於模擬接收處理消息的應用)--war
- cache:緩存模塊--jar
- rpc:rpc模塊(包含mq/mina/netty)--jar
- data:數據處理模塊--jar
- common:通用工具類模塊--jar
4.3、代碼
代碼整體沒變,只列出部分新增代碼,完整代碼從文首的github進行clone即可。
4.3.1、ssmm0
pom.xml

![]()
<!-- 管理子模塊 -->
<modules>
<module>common</module><!-- 通用類模塊 -->
<module>cache</module><!-- 緩存模塊 -->
<module>rpc</module><!-- rpc模塊 -->
<module>data</module><!-- 封裝數據操作 -->
<module>userManagement</module><!-- 具體業務1-人員管理系統,這裡的userManagement部署在serverA上(配合rpcWeb測試rpc) -->
<module>rpcWeb</module><!-- 具體業務2-用於測試RPC的另一台機器,這裡的rpcWeb項目部署在serverB上 -->
</modules>
<!-- 日志:若沒有,activemq獲取連接報錯 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.11</version>
</dependency>
View Code
說明:只列出部分新增的代碼。
注意:
- activemq必須配置slf4j-log4j12,而該jar也會被所有的模塊用到(因為所有的模塊都需要打日志),至於該模塊的版本號的選擇我們可以根據"啟動activemq,並運行自己的程序"從eclipse的console窗口的打印信息來選擇。
- slf4j-log4j12這個jar在pom.xml中引入到依賴池中後,還需要進行實際依賴
- module部分最好按照依賴關系從底向上排列,這樣在"compile"的時候不容易出錯
4.3.2、ssmm0-common

pom.xml

![]()
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4
5 <modelVersion>4.0.0</modelVersion>
6
7 <!-- 指定父模塊 -->
8 <parent>
9 <groupId>com.xxx</groupId>
10 <artifactId>ssmm0</artifactId>
11 <version>1.0-SNAPSHOT</version>
12 </parent>
13
14 <groupId>com.xxx.ssmm0</groupId>
15 <artifactId>ssmm0-common</artifactId>
16
17 <name>ssmm0-common</name>
18 <packaging>jar</packaging>
19
20 <dependencies>
21 <!-- bc-加密 -->
22 <dependency>
23 <groupId>org.bouncycastle</groupId>
24 <artifactId>bcprov-jdk15on</artifactId>
25 </dependency>
26 <!-- cc加密 -->
27 <dependency>
28 <groupId>commons-codec</groupId>
29 <artifactId>commons-codec</artifactId>
30 </dependency>
31 </dependencies>
32 </project>
View Code
DateUtil:

![]()
1 package com.xxx.util;
2
3 import java.text.DateFormat;
4 import java.text.SimpleDateFormat;
5 import java.util.Date;
6
7 /**
8 * 線程安全的日期類工具
9 */
10 public class DateUtil {
11 private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
12 private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>();
13
14 /**
15 * 獲取DateFormat實例
16 */
17 public static DateFormat getDateFormat() {
18 DateFormat df = threadLocal.get();//從threadLocal中獲取當前線程的DateFormat實例副本
19 if(df==null){//如果當前線程實例為null,說明該線程第一次使用該方法
20 df = new SimpleDateFormat(DATE_FORMAT);//創建df實例
21 threadLocal.set(df);//將df實例放置到threadLocal中去
22 }
23 return df;
24 }
25
26 /**
27 * 將Date格式化為String字符串
28 */
29 public static String formatDate(Date date) {
30 return getDateFormat().format(date);
31 }
32
33 /**
34 * 獲取當前時間
35 * @return 字符串(eg.2001-11-12 12:23:34)
36 */
37 public static String getCurrentTime() {
38 //第一種方式
39 //return formatDate(new Date());
40
41 //第二種方式(也是最推薦的方式)
42 DateFormat df = getDateFormat();
43 return df.format(System.currentTimeMillis());
44
45 //第三種方式
46 /*Calendar c = Calendar.getInstance();
47 return c.get(Calendar.YEAR)+"-"+c.get(Calendar.MONTH)+"-"+c.get(Calendar.DATE)
48 +"-"+c.get(Calendar.HOUR)+"-"+c.get(Calendar.MINUTE)+"-"+c.get(Calendar.SECOND);*/
49 }
50
51 /*****************測試*****************/
52 /*public static void main(String[] args) {
53 System.out.println(getCurrentTime());
54 }*/
55 }
View Code
注意:
- jdk的SimpleDateFormat類是一個線程不安全的類,一般情況下只要不設置為static型類變量就可以了,但是更安全的做法是使用ThreadLocal類包裝一下(如代碼所示),當然也可以使用其他的日期工具。
- 獲取當前時間有三種方式(如代碼所示),最推薦的是第二種
PropUtil:即之前的FileUtil
4.3.3、ssmm0-rpc

pom.xml

![]()
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4
5 <modelVersion>4.0.0</modelVersion>
6
7 <!-- 指定父模塊 -->
8 <parent>
9 <groupId>com.xxx</groupId>
10 <artifactId>ssmm0</artifactId>
11 <version>1.0-SNAPSHOT</version>
12 </parent>
13
14 <groupId>com.xxx.ssmm0</groupId>
15 <artifactId>ssmm0-rpc</artifactId>
16
17 <name>ssmm0-rpc</name>
18 <packaging>jar</packaging>
19
20 <!-- 引入實際依賴 -->
21 <dependencies>
22 <!-- 引入自定義common模塊 -->
23 <dependency>
24 <groupId>com.xxx.ssmm0</groupId>
25 <artifactId>ssmm0-common</artifactId>
26 <version>1.0-SNAPSHOT</version>
27 </dependency>
28 <!-- activemq -->
29 <dependency>
30 <groupId>org.apache.activemq</groupId>
31 <artifactId>activemq-all</artifactId>
32 <version>5.5.0</version>
33 </dependency>
34 </dependencies>
35 </project>
View Code
rpc_config.properties

![]()
#activemq配置
activemq.queueURL=tcp://127.0.0.1:61616
activemq.queueName=adminQueue
View Code
說明:
- 這裡直接將數據配置在這裡了,實際上可以將數據配置到ssmm0的根pom.xml中去。
ActiveMQP2PUtil:基於P2P的activemq的消息收發工具類

![]()
1 package com.xxx.rpc.mq.util;
2
3 import java.io.Serializable;
4 import java.util.Properties;
5
6 import javax.jms.Connection;
7 import javax.jms.ConnectionFactory;
8 import javax.jms.DeliveryMode;
9 import javax.jms.Destination;
10 import javax.jms.JMSException;
11 import javax.jms.Message;
12 import javax.jms.MessageConsumer;
13 import javax.jms.MessageProducer;
14 import javax.jms.ObjectMessage;
15 import javax.jms.Session;
16
17 import org.apache.activemq.ActiveMQConnection;
18 import org.apache.activemq.ActiveMQConnectionFactory;
19
20 import com.xxx.rpc.mq.handler.MessageHandler;
21 import com.xxx.util.PropUtil;
22
23 /**
24 * activemq p2p 工具類
25 */
26 public class ActiveMQP2PUtil {
27 private static final String RPC_CONFIG_FILE = "rpc_config.properties";
28 private static String queueURL; //隊列所在的URL
29 private static String queueName; //隊列名稱
30 private static ConnectionFactory connectionFactory; //連接工廠
31
32 static{
33 Properties props = PropUtil.loadProps(RPC_CONFIG_FILE);
34 queueURL = props.getProperty("activemq.queueURL", "tcp://127.0.0.1:61616");
35 System.out.println(queueURL);
36 queueName = props.getProperty("activemq.queueName", "adminQueue");
37 connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
38 ActiveMQConnection.DEFAULT_PASSWORD,
39 queueURL);
40 }
41
42 /**
43 * 發送消息
44 */
45 public static void sendMessage(Serializable message){
46 Connection conn = null;
47 try {
48 conn = connectionFactory.createConnection();//創建連接
49 conn.start();//啟動連接
50 Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建session
51 Destination destination = session.createQueue(queueName);//創建隊列
52 MessageProducer producer = session.createProducer(destination);
53 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息設置為非持久化
54 ObjectMessage msg = session.createObjectMessage(message);//創建消息:createObjectMessage()該方法的入參是Serializable型的
55 producer.send(msg);//發送消息
56 session.commit();//提交消息
57 } catch (JMSException e) {
58 e.printStackTrace();
59 }finally{
60 if(conn!=null){
61 try {
62 conn.close();
63 } catch (JMSException e) {
64 e.printStackTrace();
65 }
66 }
67 }
68 }
69
70 /**
71 * 接收消息
72 * @param handler 自定義的消息處理器
73 */
74 public static void receiveMessage(MessageHandler handler){
75 Connection conn = null;
76 try {
77 conn = connectionFactory.createConnection();//創建連接
78 conn.start();//啟動連接
79 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session
80 Destination destination = session.createQueue(queueName);//創建隊列
81 MessageConsumer consumer = session.createConsumer(destination);//創建消息消費者
82 while(true){//死循環接收消息
83 Message msg = consumer.receive();//接收消息
84 if(msg!=null){
85 handler.handle(msg);//處理消息
86 //System.out.println(msg);
87 }
88 }
89 } catch (JMSException e) {
90 e.printStackTrace();
91 }finally{
92 if(conn!=null){
93 try {
94 conn.close();
95 } catch (JMSException e) {
96 e.printStackTrace();
97 }
98 }
99 }
100 }
101
102 /*public static void main(String[] args) {
103 sendMessage("hello world3");
104 }*/
105 }
View Code
說明:
- 對照P2P的執行流程來看代碼
- 關於static塊的執行時機,可以去看 第四章 類加載機制
- 在我們啟動spring容器時,上述的static塊不執行,只有第一次使用到該類的時候才執行
- 假設我們為該類添加了注解@Component,那麼該類會由spring容器來管理,在spring初始化bean之後就會執行該static塊(也就是說spring容器啟動時,執行static塊)
- 若將該類不添加如上注解,直接實現接口InitializingBean,並且將static代碼塊中的信息寫到afterPropertiesSet()方法中,則spring容器啟動時,執行static塊
- 對於消息的接收,這裡采用了循環等待機制(即死循環),也可以使用事件通知機制
- 關於activemq的其他內容之後再說
MessageHandler:消息處理器接口(其實現類是對接收到的消息進行處理的真正部分)

![]()
1 package com.xxx.rpc.mq.handler;
2
3 import javax.jms.Message;
4
5 /**
6 * 消息處理器接口
7 */
8 public interface MessageHandler {
9 public void handle(Message message);
10 }
View Code
4.3.4、ssmm0-data

pom.xml

![]()
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4
5 <modelVersion>4.0.0</modelVersion>
6
7 <!-- 指定父模塊 -->
8 <parent>
9 <groupId>com.xxx</groupId>
10 <artifactId>ssmm0</artifactId>
11 <version>1.0-SNAPSHOT</version>
12 </parent>
13
14 <groupId>com.xxx.ssmm0</groupId>
15 <artifactId>ssmm0-data</artifactId>
16
17 <name>ssmm0-data</name>
18 <packaging>jar</packaging><!-- 只是作為其他模塊使用的工具 -->
19
20 <!-- 引入實際依賴 -->
21 <dependencies>
22 <!-- mysql -->
23 <dependency>
24 <groupId>mysql</groupId>
25 <artifactId>mysql-connector-java</artifactId>
26 </dependency>
27 <!-- 數據源 -->
28 <dependency>
29 <groupId>org.apache.tomcat</groupId>
30 <artifactId>tomcat-jdbc</artifactId>
31 </dependency>
32 <!-- mybatis -->
33 <dependency>
34 <groupId>org.mybatis</groupId>
35 <artifactId>mybatis</artifactId>
36 </dependency>
37 <dependency>
38 <groupId>org.mybatis</groupId>
39 <artifactId>mybatis-spring</artifactId>
40 </dependency>
41 <!-- servlet --><!-- 為了會用cookie -->
42 <dependency>
43 <groupId>javax.servlet</groupId>
44 <artifactId>javax.servlet-api</artifactId>
45 </dependency>
46 <!-- guava cache -->
47 <dependency>
48 <groupId>com.google.guava</groupId>
49 <artifactId>guava</artifactId>
50 <version>14.0.1</version>
51 </dependency>
52 <!-- 引入自定義cache模塊 -->
53 <dependency>
54 <groupId>com.xxx.ssmm0</groupId>
55 <artifactId>ssmm0-cache</artifactId>
56 <version>1.0-SNAPSHOT</version>
57 </dependency>
58 <!-- 引入自定義rpc模塊 -->
59 <dependency>
60 <groupId>com.xxx.ssmm0</groupId>
61 <artifactId>ssmm0-rpc</artifactId>
62 <version>1.0-SNAPSHOT</version>
63 </dependency>
64 </dependencies>
65 </project>
View Code
Log:日志模型類

![]()
package com.xxx.model.log;
import java.io.Serializable;
/**
* 日志
*/
public class Log implements Serializable {
private static final long serialVersionUID = -8280602625152351898L;
private String operation; // 執行的操作
private String currentTime; // 當前時間
public String getOperation() {
return operation;
}
public void setOperation(String operation) {
this.operation = operation;
}
public String getCurrentTime() {
return currentTime;
}
public void setCurrentTime(String currentTime) {
this.currentTime = currentTime;
}
}
View Code
注意:
- 需要實現序列化接口,在activemq中的消息需要序列化和反序列化
說明:對應的數據庫表

LogMapper

![]()
1 package com.xxx.mapper.log;
2
3 import org.apache.ibatis.annotations.Insert;
4
5 import com.xxx.model.log.Log;
6
7 /**
8 * 日志Mapper
9 */
10 public interface LogMapper {
11
12 /**
13 * 這裡需要注意的是,current_time是數據庫的保留參數,兩點注意:
14 * 1、最好不要用保留參數做變量名
15 * 2、如果不經意間已經用了,那麼保留參數需要用``括起來(`-->該符號是英文狀態下esc鍵下邊的那個鍵)
16 * @param log
17 * @return
18 */
19 @Insert("INSERT INTO log(operation, `current_time`) VALUES(#{operation},#{currentTime})")
20 public int insertLog(Log log);
21
22 }
View Code
注意:由於疏忽,在創建數據庫的時候,屬性"當前時間"取名為"current_time",沒注意到該詞是MySQL的關鍵字(即保留字)。
- 最好不要用關鍵字做變量名
- 如果不經意間已經用了,那麼保留參數需要用``括起來(`-->該符號是英文狀態下esc鍵下邊的那個鍵)
LogDao:

![]()
1 package com.xxx.dao.log;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Repository;
5
6 import com.xxx.mapper.log.LogMapper;
7 import com.xxx.model.log.Log;
8
9 /**
10 * 日志DAO
11 */
12 @Repository
13 public class LogDao {
14
15 @Autowired
16 private LogMapper logMapper;
17 /***************注解*****************/
18 public boolean insertLog(Log log){
19 return logMapper.insertLog(log)==1?true:false;
20 }
21
22 }
View Code
LogMessageHandler:MessageHandler的實現類,對接收到的log消息進行具體的操作

![]()
1 package com.xxx.service.log;
2
3 import javax.jms.JMSException;
4 import javax.jms.Message;
5 import javax.jms.ObjectMessage;
6
7 import org.springframework.beans.factory.annotation.Autowired;
8 import org.springframework.stereotype.Component;
9
10 import com.xxx.dao.log.LogDao;
11 import com.xxx.model.log.Log;
12 import com.xxx.rpc.mq.handler.MessageHandler;
13
14
15 /**
16 * 日志處理器(更適合放在data層)
17 * 因為:
18 * 1、data依賴於rpc,而rpc不依賴於data,所以如果該類放在rpc層,並且該類需要用到數據庫操作(eg.將日志寫入數據庫),那麼就不好辦了
19 * 2、rpc層說白了,就是一些rpc工具類,實際上與業務無關,與業務有關的,我們可以抽取到該部分來
20 */
21 @Component
22 public class LogMessageHandler implements MessageHandler {
23
24 @Autowired
25 private LogDao logDao;
26
27 public void handle(Message message) {
28 System.out.println(logDao);
29 ObjectMessage objMsg = (ObjectMessage)message;
30 try {
31 Log log = (Log)objMsg.getObject();
32 logDao.insertLog(log);//將日志寫入數據庫
33 } catch (JMSException e) {
34 e.printStackTrace();
35 }
36
37 }
38
39 }
View Code
說明:
- 該類相當於一個service
- 該類放在data模塊而不是rpc模塊,其接口放在了rpc模塊,原因:
- data依賴於rpc,而rpc不依賴於data,所以如果該類放在rpc層,並且該類需要用到數據庫操作(eg.將日志寫入數據庫),那麼就不好辦了
- rpc層說白了,就是一些rpc工具類,實際上與業務無關,與業務有關的,我們可以抽取到該部分來
AdminService:

![]()
1 /**
2 * 測試activeMQ
3 *
4 * 消息生產者做的事:(部署在服務器A)
5 * 1)添加一個用戶
6 * 2)用戶添加成功後,
7 * 2.1)創建一個Log(日志類)實例
8 * 2.2)將該日志實例作為消息發送給消息隊列
9 *
10 * 消息消費者做的事:(部署在服務器B)
11 * 1)從隊列接收消息
12 * 2)用日志處理器對消息進行操作(將該消息寫入數據庫)
13 */
14 public boolean register(Admin admin) {
15 boolean isRegisterSuccess = adminDao.register(admin);
16 if(isRegisterSuccess) {
17 Log log = new Log();
18 log.setOperation("增加一個用戶");
19 log.setCurrentTime(DateUtil.getCurrentTime());
20
21 ActiveMQP2PUtil.sendMessage(log);//將消息發送到消息服務器(即activeMQ服務器),不需要等待消息處理結果,直接向下執行
22 }
23 return isRegisterSuccess;
24 }
View Code
說明:
- 該類只修改了以上方法
- 將消息發送到消息服務器(即activeMQ服務器),不需要等待消息處理結果,直接向下執行(體現異步)
4.3.5、ssmm0-rpcWeb

pom.xml

![]()
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4
5 <modelVersion>4.0.0</modelVersion>
6
7 <!-- 指定父模塊 -->
8 <parent>
9 <groupId>com.xxx</groupId>
10 <artifactId>ssmm0</artifactId>
11 <version>1.0-SNAPSHOT</version>
12 </parent>
13
14 <groupId>com.xxx.ssmm0</groupId>
15 <artifactId>ssmm0-rpcWeb</artifactId>
16
17 <name>ssmm0-rpcWeb</name>
18 <packaging>war</packaging><!-- 需要部署的模塊 -->
19
20 <!-- 引入實際依賴 -->
21 <dependencies>
22 <!-- 將ssmm0-data項目作為一個jar引入項目中 -->
23 <dependency>
24 <groupId>com.xxx.ssmm0</groupId>
25 <artifactId>ssmm0-data</artifactId>
26 <version>1.0-SNAPSHOT</version>
27 </dependency>
28 <!-- spring mvc(如果沒有web.xml中的CharacterEncodingFilter找不到) -->
29 <dependency>
30 <groupId>org.springframework</groupId>
31 <artifactId>spring-web</artifactId>
32 </dependency>
33 <dependency>
34 <groupId>org.springframework</groupId>
35 <artifactId>spring-webmvc</artifactId>
36 </dependency>
37 </dependencies>
38 </project>
View Code
spring.xml

![]()
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd">
<!-- 注解掃描 -->
<context:component-scan base-package="com.xxx.web" /><!-- 只掃描web就可以 -->
<!-- 這裡需要引入ssmm0-data項目中配置的spring-data.xml(之前不引也可以成功,忘記怎麼配置的了) -->
<import resource="classpath:spring-data.xml"/>
</beans>
View Code
web.xml

![]()
<?xml version="1.0" encoding="utf-8"?>
<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
<servlet>
<servlet-name>dispatcherServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>dispatcherServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
<filter>
<filter-name>encodingFilter</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>encodingFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<welcome-file-list>
<welcome-file>/index.jsp</welcome-file>
</welcome-file-list>
</web-app>
View Code
MessageReceiver:死循環從隊列接收消息並將消息傳給消息處理器實現類(LogMessageHandler)處理

![]()
1 package com.xxx.web.mq;
2
3 import org.springframework.beans.factory.annotation.Autowired;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.RequestMapping;
6
7 import com.xxx.rpc.mq.util.ActiveMQP2PUtil;
8 import com.xxx.service.log.LogMessageHandler;
9
10 /**
11 * 用於接收消息的測試類
12 */
13 @Controller
14 @RequestMapping("/mq")
15 public class MessageReceiver {
16
17 @Autowired
18 private LogMessageHandler handler;
19
20 @RequestMapping("/receive")
21 public void receiveMessage() {
22 ActiveMQP2PUtil.receiveMessage(handler);
23 }
24
25 }
View Code
5、測試
5.1、安裝activemq
1)下載解壓"apache-activemq-5.5.0-bin.zip",之後,若是32bit機器,進入"E:\activemq-5.5.0\bin\win32"下,雙擊"activemq.bat"即可。(當然,如果雙擊無法啟動,可能有其他進程占用61616端口,查一下是哪一個進程,然後去服務中關掉即可)
2)啟動服務後,在浏覽器輸入"http://127.0.0.1:8161/admin/queues.jsp",看到隊列頁面,則安裝並啟動成功,該頁面是一個隊列消息的監控頁面,包括
- 隊列名稱:Name
- 當下有多少消息在隊列中等待消費:Number Of Pending Messages
- 有幾個消費者:Number Of Consumers
- 從啟動activemq服務到現在一共入隊了多少消息:Messages Enqueued
- 從啟動activemq服務到現在一共出隊了多少消息:Messages Dequeued
- Number Of Pending Messages + Messages Dequeued = Messages Enqueued
5.2、運行ssmm0-userManagement
浏覽器執行"http://localhost:8080/admin/register?username=canglang25&password=1457890"
注意:這裡使用了8080端口
5.3、運行ssmm0-rpcWeb
浏覽器執行"http://localhost:8081/mq/receive"
注意:
- 這裡使用了8081端口
- 執行該URL後,浏覽器會一直在轉圈(即一直在等待接收消息),直到關閉jetty服務器
說明:jetty在不同的端口下可以同時啟動,在同一端口下後邊啟動的服務會覆蓋之前啟動的服務
6、總結
- 消息隊列入門簡單,想要完全掌握很難
- 關於git的基本使用查看《progit中文版》