程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> ActiveMQ簡述

ActiveMQ簡述

編輯:JAVA綜合教程

ActiveMQ簡述


概述

ActiveMQ是Apache所提供的一個開源的消息系統,完全采用Java來實現,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規范。JMS是一組Java應用程序接口,它提供消息的創建、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,類似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序能夠與不同廠商的消息組件很好地進行通信。

JMS支持兩種消息發送和接收模型。一種稱為P2P(Ponit to Point)模型,即采用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱為可能,P2P模型在點對點的情況下進行消息傳遞時采用。

這裡寫圖片描述

另一種稱為Pub/Sub(Publish/Subscribe,即發布-訂閱)模型,發布-訂閱模型定義了如何向一個內容節點發布和訂閱消息,這個內容節點稱為topic(主題)。主題可以認為是消息傳遞的中介,消息發布這將消息發布到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發布者互相保持獨立,不需要進行接觸即可保證消息的傳遞,發布-訂閱模型在消息的一對多廣播時采用。

這裡寫圖片描述


ActiveMQ的安裝

下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對linux系統進行闡述,當然ActiveMQ也有win版的,這裡就不贅述了),可以去官網下載,也可以在下方留言區留下你的郵箱,博主會發給你的~

下載之後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gz

ActiveMQ目錄內容有:

bin目錄包含ActiveMQ的啟動腳本 conf目錄包含ActiveMQ的所有配置文件 data目錄包含日志文件和持久性消息數據 example: ActiveMQ的示例 lib: ActiveMQ運行所需要的lib webapps: ActiveMQ的web控制台和一些相關的demo

運行命令:activemq start(在activemq/bin下運行)

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')

查看activemq是否運行命令:ps -aux | grep activemq

shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq

關閉命令: activemq stop

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre
  Heap sizes: current=63232k  free=62218k  max=932096k
    JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:
  [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED

ActiveMQ的默認服務端口為61616,這個可以在conf/activemq.xml配置文件中修改:


    
    

案例

在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個jar包:activemq-all-5.13.2.jar,引入這個jar到你的項目中即可開始編寫案例代碼。

博主的activemq服務器地址為10.10.195.187,這個在下面代碼中會有體現。

按照JMS的規范,我們首先需要獲得一個JMS connection factory.,通過這個connection factory來創建connection.在這個基礎之上我們再創建session, destination, producer和consumer。因此主要的幾個步驟如下:

獲得JMS connection factory. 通過我們提供特定環境的連接信息來構造factory。利用factory構造JMS connection 啟動connection 通過connection創建JMS session. 指定JMS destination. 創建JMS producer或者創建JMS message並提供destination. 創建JMS consumer或注冊JMS message listener. 發送和接收JMS message. 關閉所有JMS資源,包括connection, session, producer, consumer等。

下面來看代碼舉例(P2P式)。
通過Java實現的基於ActiveMQ的請求提交:

package com.zzh.activemq;

import java.io.Serializable;
import java.util.HashMap;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class RequestSubmit
{
    //消息發送者
    private MessageProducer producer;
    //一個發送或者接受消息的線程
    private Session session;

    public void init() throws Exception
    {
        //ConnectionFactory連接工廠,JMS用它創建連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //Connection:JMS客戶端到JMS Provider的連接,從構造工廠中得到連接對象
        Connection connection = connectionFactory.createConnection();
        //啟動
        connection.start();
        //獲取連接操作
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destinatin = session.createQueue("RequestQueue");
        //得到消息生成(發送)者
        producer = session.createProducer(destinatin);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    public void submit(HashMap requestParam) throws Exception
    {
        ObjectMessage message = session.createObjectMessage(requestParam);
        producer.send(message);
        session.commit();
    }

    public static void main(String[] args) throws Exception{
        RequestSubmit submit = new RequestSubmit();
        submit.init();
        HashMap requestParam = new HashMap();
        requestParam.put("朱小厮", "zzh");
        submit.submit(requestParam);
    }
}

創建Session時有兩個非常重要的參數,第一個boolean類型的參數用來表示是否采用事務消息。如果是事務消息,對於的參數設置為true,此時消息的提交自動有comit處理,消息的回滾則自動由rollback處理。加入消息不是事務的,則對應的該參數設置為false,此時分為三種情況:

Session.AUTO_ACKNOWLEDGE表示Session會自動確認所接收到的消息。 Session.CLIENT_ACKNOWLEDGE表示由客戶端程序通過調用消息的確認方法來確認所接收到的消息。 Session.DUPS_OK_ACKNOWLEDGE使得Session將“懶惰”地確認消息,即不會立即確認消息,這樣有可能導致消息重復投遞。

提供Java實現的基於ActiveMQ的請求處理:

package com.zzh.activemq;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class RequestProcessor
{
    public void requestHandler(HashMap requestParam) throws Exception
    {
        System.out.println("requestHandler....."+requestParam.toString());
        for(Map.Entry entry : requestParam.entrySet())
        {
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
    }

    public static void main(String[] args) throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("RequestQueue");
        //消息消費(接收)者
        MessageConsumer consumer = session.createConsumer(destination);

        RequestProcessor processor = new RequestProcessor();

        while(true)
        {
            ObjectMessage message = (ObjectMessage) consumer.receive(1000);
            if(null != message)
            {
                System.out.println(message);
                HashMap requestParam = (HashMap) message.getObject();
                processor.requestHandler(requestParam);
            }
            else
            {
                break;
            }
        }
    }
}

輸出結果:

ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{朱小厮=zzh}
朱小厮:zzh

可以通過頁面查看隊列的使用情況,在浏覽器中輸入http://10.10.195.187:8161/admin/queues.jsp,用戶名和密碼都是:admin,看到以下頁面:
這裡寫圖片描述
這個是在jetty服務器下跑的,可以修改conf/jetty.xml來修改相關jetty配置。

上面的例子是關於P2P模式的,不過有個不妥之處,就是沒有資源的釋放。下面舉一個Pub/Sub模式的。
通過JMS創建ActiveMQ的tZ喎?http://www.Bkjia.com/kf/ware/vc/" target="_blank" class="keylink">vcGljo6yyorj4dG9waWO3osvNz/vPoqO6PC9wPg0KPHByZSBjbGFzcz0="brush:java;"> import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.Produce; public class TopicRequest { //消息發送者 private MessageProducer producer; //一個發送或者接受消息的線程 private Session session; //Connection:JMS客戶端到JMS Provider的連接 private Connection connection; public void init() throws Exception { //ConnectionFactory連接工廠,JMS用它創建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.195.187:61616"); //從構造工廠中得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取連接操作 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); producer = session.createProducer(topic); //設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } public void submit(String mess) throws Exception { TextMessage message = session.createTextMessage(); message.setText(mess); producer.send(message); } public void close() { try { if(session != null) session.close(); if(producer != null) producer.close(); if(connection !=null ) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { TopicRequest topicRequest = new TopicRequest(); topicRequest.init(); topicRequest.submit("I'm first"); topicRequest.close(); } }

消息發送到對應的topic後,需要將listener注冊到需要訂閱的topic上,以便能夠接收該topic的消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TopicReceive
{
    private MessageConsumer consumer;
    private Session session;

    public void init() throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message)
            {
                TextMessage tm = (TextMessage) message;
                System.out.println(tm);
                try
                {
                    System.out.println(tm.getText());
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws Exception
    {
        TopicReceive receive = new TopicReceive();
        receive.init();
    }
}

輸出結果:

ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved