一. 開篇語
繼上一篇weblogic中使用jms發送和接受消息的文章後, 本文使用apache的一個開源組件ActiveMQ接著探討JMS的話題, 本篇只是ActiveMQ的一個入門的例子, 希望對您有所幫助.
二. ActiveMQ
1. ActiveMQ簡介:
ActiveMQ是Apache的一個能力強勁的開源消息總線, 它完全支持JMS1.1和JavaEE1.4規范的JMS Provider實現.
2. ActiveMQ特性:
1.) 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2.) 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
3.) 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支持Spring2.0的特性
4.) 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resourceadaptors的配置,
可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業服務器上
5.) 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6.) 支持通過JDBC和journal提供高速的消息持久化
7.) 從設計上保證了高性能的集群,客戶端-服務器,點對點
8.) 支持Ajax
9.) 支持與Axis的整合
10.) 可以很容易得調用內嵌JMS provider,進行測試
3. 環境准備:
1.) 下載ActiveMQ:
http://activemq.apache.org/download.html, 我下載的是apache-activemq-5.2.0
2.) 運行ActiveMQ server
解壓縮下載好的文件, 雙擊bin/activemq.bat 啟動server, ActiveMQ內置了jetty服務器, 默認使用TCP連接端口為61616.
ActiveMQ提供一個用於監控ActiveMQ的admin應用: http://127.0.0.1:8161/admin
3.) 在Eclipse中建立Java工程, 並導入activemq-all-5.2.0.jar包
4.) 新建兩個Java類: 消息生產者MsgSender和消息消費者MsgReceiver
4. 代碼測試(P2P):
1.) 消息生產者: MsgSender
/** * Message Provider */ public class MsgSender { // ConnectionFactory: use to create JMS connection private static ConnectionFactory connectionFactory; // Connection: connect message provider and JMS server private static Connection connection; // Session: a message send or receive thread private static Session session; // Destination: use to sign the message type private static Destination destination; // MessageProducer:sender private static MessageProducer messageProducer; /** * init the JMS object */ public static void init() throws Exception { // use ActiveMQ to to create connection factory. connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // get the connection from connection factory connection = connectionFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); messageProducer = session.createProducer(destination); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); connection.start(); } /** * send activeMq message */ public static void sendMessage() throws Exception { for (int i = 1; i <= 5; i++) { TextMessage message = session.createTextMessage("ActiveMq message " + i); System.out.println("send:" + "ActiveMq message " + i); messageProducer.send(message); } session.commit(); } /** * release resource */ public static void release() throws Exception { messageProducer.close(); session.close(); connection.close(); } /** * main method */ public static void main(String[] args) throws Exception { init(); sendMessage(); release(); } }
2.) 消息消費者: MsgReceiver
/** * Message Consumer */ public class MsgReceiver { // ConnectionFactory: use to create JMS connection private static ConnectionFactory connectionFactory; // Connection: connect message provider and JMS server private static Connection connection; // Session: a message send or receive thread private static Session session; // use to sign the message type private static Destination destination; // MessageConsumer: receiver private static MessageConsumer messageConsumer; /** * init the JMS object */ public static void init() throws Exception { // use ActiveMQ to to create connection factory. connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // get the connection from connection factory connection = connectionFactory.createConnection(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); messageConsumer = session.createConsumer(destination); connection.start(); } /** * receive activeMq message */ public static void receiveMessage() throws Exception { while (true) { TextMessage message = (TextMessage) messageConsumer.receive(); if (message != null) { System.out.println("receive: " + message.getText()); } else { break; } } } /** * release resource */ public static void release() throws Exception { messageConsumer.close(); session.close(); connection.close(); } /** * main method */ public static void main(String[] args) throws Exception { init(); receiveMessage(); release(); } }