程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java音訊隊列--ActiveMq 實戰

Java音訊隊列--ActiveMq 實戰

編輯:關於JAVA

Java音訊隊列--ActiveMq 實戰。本站提示廣大學習愛好者:(Java音訊隊列--ActiveMq 實戰)文章只能為提供參考,不一定能成為您想要的結果。以下是Java音訊隊列--ActiveMq 實戰正文


1、下載裝置ActiveMQ

 

  ActiveMQ官網下載地址:http://activemq.apache.org/download.html

  ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下停止開發。

 

 

  下載完裝置包,解壓之後的目錄:

 

   從它的目錄來說,還是很復雜的: 

    • bin寄存的是腳本文件
    • conf寄存的是根本配置文件
    • data寄存的是日志文件
    • docs寄存的是闡明文檔
    • examples寄存的是復雜的實例
    • lib寄存的是activemq所需jar包
    • webapps用於寄存項目的目錄

 

2、啟動ActiveMQ 

   進入到ActiveMQ 裝置目錄的Bin 目錄,linux 下輸出 ./activemq start 啟動activeMQ 服務。

   輸出命令之後,會提示我們創立了一個進程IP 號,這時分闡明服務曾經成功啟動了。

  

  ActiveMQ默許啟動時,啟動了內置的jetty服務器,提供一個用於監控ActiveMQ的admin使用。 
  admin:http://127.0.0.1:8161/admin/

 

  我們在閱讀器翻開鏈接之後輸出賬號密碼(這裡和tomcat 服務器相似)

  默許賬號:admin

  密碼:admin

  

   到這裡為止,ActiveMQ 服務端就啟動終了了。

   ActiveMQ 在linux 下的終止命令是 ./activemq stop

 

3、創立一個ActiveMQ工程

 

   項目目錄構造:

  

  上述在官網下載ActiveMq 的時分,我們可以在目錄下看到一個jar包:

  

  這個jar 包就是我們需求在項目中停止開發中運用到的相關依賴。

 

  3.1 創立消費者
public class Producter {

    //ActiveMq 的默許用戶名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默許登錄密碼
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的鏈接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);
    //鏈接工廠
    ConnectionFactory connectionFactory;
    //鏈接對象
    Connection connection;
    //事務管理
    Session session;
    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //創立一個鏈接工廠
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            //從工廠中創立一個鏈接
            connection  = connectionFactory.createConnection();
            //開啟鏈接
            connection.start();
            //創立一個事務(這裡經過參數可以設置事務的級別)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //創立一個音訊隊列
            Queue queue = session.createQueue(disname);
            //音訊消費者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
           while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //創立一條音訊
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:我是大帥哥,我如今正在消費東西!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:我是大帥哥,我如今正在消費東西!,count:"+num);
                //發送音訊
                messageProducer.send(msg);
                //提交事務
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

     

  3.2 創立消費者
public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

4、運轉ActiveMQ項目

 

  4.1 消費者開端消費音訊
public class TestMq {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

   運轉後果:

 INFO | Successfully connected to tcp://localhost:61616
Thread-6productor:我是大帥哥,我如今正在消費東西!,count:0
Thread-4productor:我是大帥哥,我如今正在消費東西!,count:1
Thread-2productor:我是大帥哥,我如今正在消費東西!,count:3
Thread-5productor:我是大帥哥,我如今正在消費東西!,count:2
Thread-3productor:我是大帥哥,我如今正在消費東西!,count:4
Thread-6productor:我是大帥哥,我如今正在消費東西!,count:5
Thread-3productor:我是大帥哥,我如今正在消費東西!,count:6
Thread-5productor:我是大帥哥,我如今正在消費東西!,count:7
Thread-2productor:我是大帥哥,我如今正在消費東西!,count:8
Thread-4productor:我是大帥哥,我如今正在消費東西!,count:9
Thread-6productor:我是大帥哥,我如今正在消費東西!,count:10
Thread-3productor:我是大帥哥,我如今正在消費東西!,count:11
Thread-5productor:我是大帥哥,我如今正在消費東西!,count:12
Thread-2productor:我是大帥哥,我如今正在消費東西!,count:13
Thread-4productor:我是大帥哥,我如今正在消費東西!,count:14
Thread-6productor:我是大帥哥,我如今正在消費東西!,count:15
Thread-3productor:我是大帥哥,我如今正在消費東西!,count:16
Thread-5productor:我是大帥哥,我如今正在消費東西!,count:17
Thread-2productor:我是大帥哥,我如今正在消費東西!,count:18
Thread-4productor:我是大帥哥,我如今正在消費東西!,count:19

  

 

  4.2 消費者開端消費音訊
public class TestConsumer {
    public static void main(String[] args){
        Comsumer comsumer = new Comsumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Comsumer comsumer;
        public ConsumerMq(Comsumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("Jaycekon-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  運轉後果:

 INFO | Successfully connected to tcp://localhost:61616
Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:4--->0
Thread-3: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:36--->1
Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:38--->2
Thread-5: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:37--->3
Thread-2: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:2--->4
Thread-3: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:40--->5
Thread-4: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:42--->6
Thread-5: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:41--->7
Thread-2: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:1--->8
Thread-3: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:44--->9
Thread-4: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我如今正在消費東西!,count:46--->10
Thread-5: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:45--->11
Thread-2: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:3--->12
Thread-3: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:48--->13
Thread-4: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:50--->14
Thread-5: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:49--->15
Thread-4: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我如今正在消費東西!,count:54--->16
Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我如今正在消費東西!,count:6--->17
Thread-3: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我如今正在消費東西!,count:52--->18
Thread-5: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:53--->19
Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我如今正在消費東西!,count:58--->20

  

  檢查運轉後果,我們可以做ActiveMQ 服務端:http://127.0.0.1:8161/admin/ 外面的Queues 中檢查我們消費的音訊。

 

 

5、ActiveMQ的特性
 5.1 ActiveMq 的特性 
  1. 多種言語和協議編寫客戶端。言語: Java, C, C++, C#, Ruby, Perl, Python, PHP。使用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4標准 (耐久化,XA音訊,事務)
  3. 對Spring的支持,ActiveMQ可以很容易內嵌到運用Spring的零碎外面去,而且也支持Spring2.0的特性
  4. 經過了罕見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署就任何兼容J2EE 1.4 商業服務器上
  5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持經過JDBC和journal提供高速的音訊耐久化
  7. 從設計上保證了高功能的集群,客戶端-服務器,點對點
  8. 支持Ajax
  9. 支持與Axis的整合
  10. 可以很容易得調用內嵌JMS provider,停止測試
   5.2 什麼狀況下運用ActiveMQ?
  1. 多個項目之間集成 
    (1) 跨平台 
    (2) 多言語 
    (3) 多項目
  2. 降低零碎間模塊的耦合度,解耦 
    (1) 軟件擴展性
  3. 零碎前後端隔離 
    (1) 前後端隔離,屏蔽高平安區

 


 

關於JMS(Java 音訊服務) 的一些概述可以參考我的上一篇博客:http://www.cnblogs.com/jaycekon/p/6220200.html

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved