注意:本章代碼基於 第十章 企業項目開發--分布式緩存Redis(2)
代碼的github地址:https://github.com/zhaojigang/ssmm0
消息隊列是分布式系統中實現RPC的一種手段。
1、消息隊列的基本使用流程
假設:
分析:
在這裡我們作為演示,,每當注冊一個admin的之後,我們異步寫一條日志log數據到數據庫。
下邊的舉例也是對代碼的解釋。
當server1執行一個"http://localhost:8080/admin/register?username=canglang25&password=1457890"操作,即向數據庫插一條admin信息時,同時將日志log信息寫入server3,之後不會等待log信息被server2消費掉就直接返回(異步);
server2循環接收server3中的消息隊列中的消息,並將這些log消息寫入數據庫。
2、消息隊列的作用
3、消息隊列的兩種方式
4、實例(基於P2P實現)
4.1、整體代碼結構:
4.2、模塊依賴關系
注:箭頭的指向就是當前模塊所依賴的模塊。(eg.rpcWeb依賴data)
4.3、代碼
代碼整體沒變,只列出部分新增代碼,完整代碼從文首的github進行clone即可。
4.3.1、ssmm0
pom.xml
<!-- 管理子模塊 --> <modules> <module>common</module><!-- 通用類模塊 --> <module>cache</module><!-- 緩存模塊 --> <module>rpc</module><!-- rpc模塊 --> <module>data</module><!-- 封裝數據操作 --> <module>userManagement</module><!-- 具體業務1-人員管理系統,這裡的userManagement部署在serverA上(配合rpcWeb測試rpc) --> <module>rpcWeb</module><!-- 具體業務2-用於測試RPC的另一台機器,這裡的rpcWeb項目部署在serverB上 --> </modules> <!-- 日志:若沒有,activemq獲取連接報錯 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.5.11</version> </dependency> View Code說明:只列出部分新增的代碼。
注意:
4.3.2、ssmm0-common
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4 5 <modelVersion>4.0.0</modelVersion> 6 7 <!-- 指定父模塊 --> 8 <parent> 9 <groupId>com.xxx</groupId> 10 <artifactId>ssmm0</artifactId> 11 <version>1.0-SNAPSHOT</version> 12 </parent> 13 14 <groupId>com.xxx.ssmm0</groupId> 15 <artifactId>ssmm0-common</artifactId> 16 17 <name>ssmm0-common</name> 18 <packaging>jar</packaging> 19 20 <dependencies> 21 <!-- bc-加密 --> 22 <dependency> 23 <groupId>org.bouncycastle</groupId> 24 <artifactId>bcprov-jdk15on</artifactId> 25 </dependency> 26 <!-- cc加密 --> 27 <dependency> 28 <groupId>commons-codec</groupId> 29 <artifactId>commons-codec</artifactId> 30 </dependency> 31 </dependencies> 32 </project> View CodeDateUtil:
1 package com.xxx.util; 2 3 import java.text.DateFormat; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 6 7 /** 8 * 線程安全的日期類工具 9 */ 10 public class DateUtil { 11 private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss"; 12 private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(); 13 14 /** 15 * 獲取DateFormat實例 16 */ 17 public static DateFormat getDateFormat() { 18 DateFormat df = threadLocal.get();//從threadLocal中獲取當前線程的DateFormat實例副本 19 if(df==null){//如果當前線程實例為null,說明該線程第一次使用該方法 20 df = new SimpleDateFormat(DATE_FORMAT);//創建df實例 21 threadLocal.set(df);//將df實例放置到threadLocal中去 22 } 23 return df; 24 } 25 26 /** 27 * 將Date格式化為String字符串 28 */ 29 public static String formatDate(Date date) { 30 return getDateFormat().format(date); 31 } 32 33 /** 34 * 獲取當前時間 35 * @return 字符串(eg.2001-11-12 12:23:34) 36 */ 37 public static String getCurrentTime() { 38 //第一種方式 39 //return formatDate(new Date()); 40 41 //第二種方式(也是最推薦的方式) 42 DateFormat df = getDateFormat(); 43 return df.format(System.currentTimeMillis()); 44 45 //第三種方式 46 /*Calendar c = Calendar.getInstance(); 47 return c.get(Calendar.YEAR)+"-"+c.get(Calendar.MONTH)+"-"+c.get(Calendar.DATE) 48 +"-"+c.get(Calendar.HOUR)+"-"+c.get(Calendar.MINUTE)+"-"+c.get(Calendar.SECOND);*/ 49 } 50 51 /*****************測試*****************/ 52 /*public static void main(String[] args) { 53 System.out.println(getCurrentTime()); 54 }*/ 55 } View Code注意:
PropUtil:即之前的FileUtil
4.3.3、ssmm0-rpc
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4 5 <modelVersion>4.0.0</modelVersion> 6 7 <!-- 指定父模塊 --> 8 <parent> 9 <groupId>com.xxx</groupId> 10 <artifactId>ssmm0</artifactId> 11 <version>1.0-SNAPSHOT</version> 12 </parent> 13 14 <groupId>com.xxx.ssmm0</groupId> 15 <artifactId>ssmm0-rpc</artifactId> 16 17 <name>ssmm0-rpc</name> 18 <packaging>jar</packaging> 19 20 <!-- 引入實際依賴 --> 21 <dependencies> 22 <!-- 引入自定義common模塊 --> 23 <dependency> 24 <groupId>com.xxx.ssmm0</groupId> 25 <artifactId>ssmm0-common</artifactId> 26 <version>1.0-SNAPSHOT</version> 27 </dependency> 28 <!-- activemq --> 29 <dependency> 30 <groupId>org.apache.activemq</groupId> 31 <artifactId>activemq-all</artifactId> 32 <version>5.5.0</version> 33 </dependency> 34 </dependencies> 35 </project> View Coderpc_config.properties
#activemq配置 activemq.queueURL=tcp://127.0.0.1:61616 activemq.queueName=adminQueue View Code說明:
ActiveMQP2PUtil:基於P2P的activemq的消息收發工具類
1 package com.xxx.rpc.mq.util; 2 3 import java.io.Serializable; 4 import java.util.Properties; 5 6 import javax.jms.Connection; 7 import javax.jms.ConnectionFactory; 8 import javax.jms.DeliveryMode; 9 import javax.jms.Destination; 10 import javax.jms.JMSException; 11 import javax.jms.Message; 12 import javax.jms.MessageConsumer; 13 import javax.jms.MessageProducer; 14 import javax.jms.ObjectMessage; 15 import javax.jms.Session; 16 17 import org.apache.activemq.ActiveMQConnection; 18 import org.apache.activemq.ActiveMQConnectionFactory; 19 20 import com.xxx.rpc.mq.handler.MessageHandler; 21 import com.xxx.util.PropUtil; 22 23 /** 24 * activemq p2p 工具類 25 */ 26 public class ActiveMQP2PUtil { 27 private static final String RPC_CONFIG_FILE = "rpc_config.properties"; 28 private static String queueURL; //隊列所在的URL 29 private static String queueName; //隊列名稱 30 private static ConnectionFactory connectionFactory; //連接工廠 31 32 static{ 33 Properties props = PropUtil.loadProps(RPC_CONFIG_FILE); 34 queueURL = props.getProperty("activemq.queueURL", "tcp://127.0.0.1:61616"); 35 System.out.println(queueURL); 36 queueName = props.getProperty("activemq.queueName", "adminQueue"); 37 connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, 38 ActiveMQConnection.DEFAULT_PASSWORD, 39 queueURL); 40 } 41 42 /** 43 * 發送消息 44 */ 45 public static void sendMessage(Serializable message){ 46 Connection conn = null; 47 try { 48 conn = connectionFactory.createConnection();//創建連接 49 conn.start();//啟動連接 50 Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建session 51 Destination destination = session.createQueue(queueName);//創建隊列 52 MessageProducer producer = session.createProducer(destination); 53 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//消息設置為非持久化 54 ObjectMessage msg = session.createObjectMessage(message);//創建消息:createObjectMessage()該方法的入參是Serializable型的 55 producer.send(msg);//發送消息 56 session.commit();//提交消息 57 } catch (JMSException e) { 58 e.printStackTrace(); 59 }finally{ 60 if(conn!=null){ 61 try { 62 conn.close(); 63 } catch (JMSException e) { 64 e.printStackTrace(); 65 } 66 } 67 } 68 } 69 70 /** 71 * 接收消息 72 * @param handler 自定義的消息處理器 73 */ 74 public static void receiveMessage(MessageHandler handler){ 75 Connection conn = null; 76 try { 77 conn = connectionFactory.createConnection();//創建連接 78 conn.start();//啟動連接 79 Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建session 80 Destination destination = session.createQueue(queueName);//創建隊列 81 MessageConsumer consumer = session.createConsumer(destination);//創建消息消費者 82 while(true){//死循環接收消息 83 Message msg = consumer.receive();//接收消息 84 if(msg!=null){ 85 handler.handle(msg);//處理消息 86 //System.out.println(msg); 87 } 88 } 89 } catch (JMSException e) { 90 e.printStackTrace(); 91 }finally{ 92 if(conn!=null){ 93 try { 94 conn.close(); 95 } catch (JMSException e) { 96 e.printStackTrace(); 97 } 98 } 99 } 100 } 101 102 /*public static void main(String[] args) { 103 sendMessage("hello world3"); 104 }*/ 105 } View Code說明:
MessageHandler:消息處理器接口(其實現類是對接收到的消息進行處理的真正部分)
1 package com.xxx.rpc.mq.handler; 2 3 import javax.jms.Message; 4 5 /** 6 * 消息處理器接口 7 */ 8 public interface MessageHandler { 9 public void handle(Message message); 10 } View Code
4.3.4、ssmm0-data
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4 5 <modelVersion>4.0.0</modelVersion> 6 7 <!-- 指定父模塊 --> 8 <parent> 9 <groupId>com.xxx</groupId> 10 <artifactId>ssmm0</artifactId> 11 <version>1.0-SNAPSHOT</version> 12 </parent> 13 14 <groupId>com.xxx.ssmm0</groupId> 15 <artifactId>ssmm0-data</artifactId> 16 17 <name>ssmm0-data</name> 18 <packaging>jar</packaging><!-- 只是作為其他模塊使用的工具 --> 19 20 <!-- 引入實際依賴 --> 21 <dependencies> 22 <!-- mysql --> 23 <dependency> 24 <groupId>mysql</groupId> 25 <artifactId>mysql-connector-java</artifactId> 26 </dependency> 27 <!-- 數據源 --> 28 <dependency> 29 <groupId>org.apache.tomcat</groupId> 30 <artifactId>tomcat-jdbc</artifactId> 31 </dependency> 32 <!-- mybatis --> 33 <dependency> 34 <groupId>org.mybatis</groupId> 35 <artifactId>mybatis</artifactId> 36 </dependency> 37 <dependency> 38 <groupId>org.mybatis</groupId> 39 <artifactId>mybatis-spring</artifactId> 40 </dependency> 41 <!-- servlet --><!-- 為了會用cookie --> 42 <dependency> 43 <groupId>javax.servlet</groupId> 44 <artifactId>javax.servlet-api</artifactId> 45 </dependency> 46 <!-- guava cache --> 47 <dependency> 48 <groupId>com.google.guava</groupId> 49 <artifactId>guava</artifactId> 50 <version>14.0.1</version> 51 </dependency> 52 <!-- 引入自定義cache模塊 --> 53 <dependency> 54 <groupId>com.xxx.ssmm0</groupId> 55 <artifactId>ssmm0-cache</artifactId> 56 <version>1.0-SNAPSHOT</version> 57 </dependency> 58 <!-- 引入自定義rpc模塊 --> 59 <dependency> 60 <groupId>com.xxx.ssmm0</groupId> 61 <artifactId>ssmm0-rpc</artifactId> 62 <version>1.0-SNAPSHOT</version> 63 </dependency> 64 </dependencies> 65 </project> View CodeLog:日志模型類
package com.xxx.model.log; import java.io.Serializable; /** * 日志 */ public class Log implements Serializable { private static final long serialVersionUID = -8280602625152351898L; private String operation; // 執行的操作 private String currentTime; // 當前時間 public String getOperation() { return operation; } public void setOperation(String operation) { this.operation = operation; } public String getCurrentTime() { return currentTime; } public void setCurrentTime(String currentTime) { this.currentTime = currentTime; } } View Code注意:
說明:對應的數據庫表
LogMapper
1 package com.xxx.mapper.log; 2 3 import org.apache.ibatis.annotations.Insert; 4 5 import com.xxx.model.log.Log; 6 7 /** 8 * 日志Mapper 9 */ 10 public interface LogMapper { 11 12 /** 13 * 這裡需要注意的是,current_time是數據庫的保留參數,兩點注意: 14 * 1、最好不要用保留參數做變量名 15 * 2、如果不經意間已經用了,那麼保留參數需要用``括起來(`-->該符號是英文狀態下esc鍵下邊的那個鍵) 16 * @param log 17 * @return 18 */ 19 @Insert("INSERT INTO log(operation, `current_time`) VALUES(#{operation},#{currentTime})") 20 public int insertLog(Log log); 21 22 } View Code注意:由於疏忽,在創建數據庫的時候,屬性"當前時間"取名為"current_time",沒注意到該詞是MySQL的關鍵字(即保留字)。
LogDao:
1 package com.xxx.dao.log; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Repository; 5 6 import com.xxx.mapper.log.LogMapper; 7 import com.xxx.model.log.Log; 8 9 /** 10 * 日志DAO 11 */ 12 @Repository 13 public class LogDao { 14 15 @Autowired 16 private LogMapper logMapper; 17 /***************注解*****************/ 18 public boolean insertLog(Log log){ 19 return logMapper.insertLog(log)==1?true:false; 20 } 21 22 } View CodeLogMessageHandler:MessageHandler的實現類,對接收到的log消息進行具體的操作
1 package com.xxx.service.log; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.ObjectMessage; 6 7 import org.springframework.beans.factory.annotation.Autowired; 8 import org.springframework.stereotype.Component; 9 10 import com.xxx.dao.log.LogDao; 11 import com.xxx.model.log.Log; 12 import com.xxx.rpc.mq.handler.MessageHandler; 13 14 15 /** 16 * 日志處理器(更適合放在data層) 17 * 因為: 18 * 1、data依賴於rpc,而rpc不依賴於data,所以如果該類放在rpc層,並且該類需要用到數據庫操作(eg.將日志寫入數據庫),那麼就不好辦了 19 * 2、rpc層說白了,就是一些rpc工具類,實際上與業務無關,與業務有關的,我們可以抽取到該部分來 20 */ 21 @Component 22 public class LogMessageHandler implements MessageHandler { 23 24 @Autowired 25 private LogDao logDao; 26 27 public void handle(Message message) { 28 System.out.println(logDao); 29 ObjectMessage objMsg = (ObjectMessage)message; 30 try { 31 Log log = (Log)objMsg.getObject(); 32 logDao.insertLog(log);//將日志寫入數據庫 33 } catch (JMSException e) { 34 e.printStackTrace(); 35 } 36 37 } 38 39 } View Code說明:
AdminService:
1 /** 2 * 測試activeMQ 3 * 4 * 消息生產者做的事:(部署在服務器A) 5 * 1)添加一個用戶 6 * 2)用戶添加成功後, 7 * 2.1)創建一個Log(日志類)實例 8 * 2.2)將該日志實例作為消息發送給消息隊列 9 * 10 * 消息消費者做的事:(部署在服務器B) 11 * 1)從隊列接收消息 12 * 2)用日志處理器對消息進行操作(將該消息寫入數據庫) 13 */ 14 public boolean register(Admin admin) { 15 boolean isRegisterSuccess = adminDao.register(admin); 16 if(isRegisterSuccess) { 17 Log log = new Log(); 18 log.setOperation("增加一個用戶"); 19 log.setCurrentTime(DateUtil.getCurrentTime()); 20 21 ActiveMQP2PUtil.sendMessage(log);//將消息發送到消息服務器(即activeMQ服務器),不需要等待消息處理結果,直接向下執行 22 } 23 return isRegisterSuccess; 24 } View Code說明:
4.3.5、ssmm0-rpcWeb
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4 5 <modelVersion>4.0.0</modelVersion> 6 7 <!-- 指定父模塊 --> 8 <parent> 9 <groupId>com.xxx</groupId> 10 <artifactId>ssmm0</artifactId> 11 <version>1.0-SNAPSHOT</version> 12 </parent> 13 14 <groupId>com.xxx.ssmm0</groupId> 15 <artifactId>ssmm0-rpcWeb</artifactId> 16 17 <name>ssmm0-rpcWeb</name> 18 <packaging>war</packaging><!-- 需要部署的模塊 --> 19 20 <!-- 引入實際依賴 --> 21 <dependencies> 22 <!-- 將ssmm0-data項目作為一個jar引入項目中 --> 23 <dependency> 24 <groupId>com.xxx.ssmm0</groupId> 25 <artifactId>ssmm0-data</artifactId> 26 <version>1.0-SNAPSHOT</version> 27 </dependency> 28 <!-- spring mvc(如果沒有web.xml中的CharacterEncodingFilter找不到) --> 29 <dependency> 30 <groupId>org.springframework</groupId> 31 <artifactId>spring-web</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework</groupId> 35 <artifactId>spring-webmvc</artifactId> 36 </dependency> 37 </dependencies> 38 </project> View Codespring.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.2.xsd"> <!-- 注解掃描 --> <context:component-scan base-package="com.xxx.web" /><!-- 只掃描web就可以 --> <!-- 這裡需要引入ssmm0-data項目中配置的spring-data.xml(之前不引也可以成功,忘記怎麼配置的了) --> <import resource="classpath:spring-data.xml"/> </beans> View Codeweb.xml
<?xml version="1.0" encoding="utf-8"?> <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> <init-param> <param-name>forceEncoding</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <welcome-file-list> <welcome-file>/index.jsp</welcome-file> </welcome-file-list> </web-app> View CodeMessageReceiver:死循環從隊列接收消息並將消息傳給消息處理器實現類(LogMessageHandler)處理
1 package com.xxx.web.mq; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.stereotype.Controller; 5 import org.springframework.web.bind.annotation.RequestMapping; 6 7 import com.xxx.rpc.mq.util.ActiveMQP2PUtil; 8 import com.xxx.service.log.LogMessageHandler; 9 10 /** 11 * 用於接收消息的測試類 12 */ 13 @Controller 14 @RequestMapping("/mq") 15 public class MessageReceiver { 16 17 @Autowired 18 private LogMessageHandler handler; 19 20 @RequestMapping("/receive") 21 public void receiveMessage() { 22 ActiveMQP2PUtil.receiveMessage(handler); 23 } 24 25 } View Code
5、測試
5.1、安裝activemq
1)下載解壓"apache-activemq-5.5.0-bin.zip",之後,若是32bit機器,進入"E:\activemq-5.5.0\bin\win32"下,雙擊"activemq.bat"即可。(當然,如果雙擊無法啟動,可能有其他進程占用61616端口,查一下是哪一個進程,然後去服務中關掉即可)
2)啟動服務後,在浏覽器輸入"http://127.0.0.1:8161/admin/queues.jsp",看到隊列頁面,則安裝並啟動成功,該頁面是一個隊列消息的監控頁面,包括
5.2、運行ssmm0-userManagement
浏覽器執行"http://localhost:8080/admin/register?username=canglang25&password=1457890"
注意:這裡使用了8080端口
5.3、運行ssmm0-rpcWeb
浏覽器執行"http://localhost:8081/mq/receive"
注意:
說明:jetty在不同的端口下可以同時啟動,在同一端口下後邊啟動的服務會覆蓋之前啟動的服務
6、總結