ActiveMq整理之java應用

1、JMS

更多介紹參考java

http://baike.baidu.com/link?url=LNCEOGgqEX-uSKuRJooyG1RSfS7CTWDKYT8OOouhxLk_yWNN-0wNSWq7KjNQ259a9pfL95janJi8v8-drvdHqamysql

 

11背景

         當前,CORBADCOMRMIRPC中間件技術已普遍應用於各個領域。可是面對規模和複雜度愈來愈高的分佈式系統,這些技術也顯示出其侷限性:sql

         1、同步通訊:客戶發出調用後,必須等待服務對象完成處理並返回結果才能繼續執行;數據庫

         2、客戶和服務對象的生命週期緊密耦合:客戶進程和服務對象進行都必須正常運行,若是因爲服務對象崩潰或者網絡故障致使客戶的請求不可達,客戶會接收到異常。apache

         3、點對點通訊:客戶的一次調用只發送給某個單獨的目標對象。安全

 

         面向消息的中間件(MessageOriented Middleward,MOM)較好的解決了以上問題。發送者將消息發送給消息服務器,消息服務器將消息存放在若干隊列中,在合適的時候再把消息轉發給接收者。這種模式下,發送和接收是異步的,發送者無需等待,兩者的生命週期也未必相同,發送消息的時候接收者不必定運行,接收消息的時候發送者也不必定運行,一對多通訊,對於一個消息能夠有多個接收者。服務器

 

12簡介

         JMSjava消息服務(javamessage service)應用程序接口,是一個java平臺中關於面向消息中間件(MOM)API,用在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。網絡

         JMS是一種與廠商無關的API,用來訪問消息收發系統消息,它相似於JDBC(java database connectitity)。這裏,JDBC是能夠用來訪問許多不一樣關係數據庫的API,而JMS則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。許多廠商都提供JMS,包括IBMMQSeriesBEA Weblogic JMS service Progress SonicMQ、微軟的MSMQJMS使使用者能夠經過消息收發服務(有時稱爲消息中介服務或路由器)從一個JMS客戶機向另外一個JMS客戶機發送消息。消息是JMS中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成。消息主體則攜帶着應用程序的數據或有效負載。根據有效負載的類型來劃分,能夠將消息分爲幾種類型,它們分別攜帶:簡單文本、可序列化的對象、屬性集合、字節流、原始值流,還有無有效負載的消息。

session

 

13JMS體系架構

JMS由如下元素組成:架構

JMS提供者

鏈接面向消息中間件的,JMS接口的一個實現。提供者能夠是Java平臺的JMS實現,也能夠是非Java平臺的面向消息中間件的適配器。

JMS客戶

生產或消費基於消息的Java的應用程序或對象。

JMS生產者

建立併發送消息的JMS客戶。

JMS消費者

接收消息的JMS客戶。

JMS消息

包括能夠在JMS客戶之間傳遞的數據的對象

JMS隊列

一個容納那些被髮送的等待閱讀的消息的區域。與隊列名字所暗示的意思不一樣,消息的接受順序並不必定要與消息的發送順序相同。一旦一個消息被閱讀,該消息將被從隊列中移走。

JMS主題

一種支持發送消息給多個訂閱者的機制。

 

14JMS對象模型

1)鏈接工廠。鏈接工廠(ConnectionFactory)是由管理員建立,並綁定到JNDI樹中。客戶端使用JNDI查找鏈接工廠,而後利用鏈接工廠建立一個JMS鏈接。

2JMS鏈接。JMS鏈接(Connection)表示JMS客戶端和服務器端之間的一個活動的鏈接,是由客戶端經過調用鏈接工廠的方法創建的。

3JMS會話。JMS會話(Session)表示JMS客戶與JMS服務器之間的會話狀態。JMS會話創建在JMS鏈接上,表示客戶與服務器之間的一個會話線程。

4JMS目的。JMS目的(Destination),又稱爲消息隊列,是實際的消息源。

5JMS生產者和消費者。生產者(MessageProducer)和消費者(Message Consumer)對象由Session對象建立,用於發送和接收消息。

6JMS消息一般有兩種類型:

點對點(Point-to-Point)。在點對點的消息系統中,消息分發給一個單獨的使用者。點對點消息每每與隊列(javax.jms.Queue)相關聯。

發佈/訂閱(Publish/Subscribe)。發佈/訂閱消息系統支持一個事件驅動模型,消息生產者和消費者都參與消息的傳遞。生產者發佈事件,而使用者訂閱感興趣的事件,並使用事件。該類型消息通常與特定的主題(javax.jms.Topic)關聯。

 

 

15模型

Java消息服務應用程序結構支持兩種模型:

點對點或隊列模型

發佈者/訂閱者模型

 

在點對點或隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被歸納爲:

只有一個消費者將得到消息。

生產者不須要在接收者消費該消息期間處於運行狀態,接收者也一樣不須要在消息發送時處於運行狀態。

每個成功處理的消息都由接收者簽收。

 

發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。這種模式比如是匿名公告板。這種模式被歸納爲:

多個消費者能夠得到消息

在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便客戶可以訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。在那種狀況下,在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。

 

使用Java語言,JMS提供了將應用與提供數據的傳輸層相分離的方式。同一組Java類能夠經過JNDI中關於提供者的信息,鏈接不一樣的JMS提供者。這一組類首先使用一個鏈接工廠以鏈接到隊列或主題,而後發送或發佈消息。在接收端,客戶接收或訂閱這些消息。

 

16傳遞方式

JMS有兩種傳遞消息的方式。標記爲NON_PERSISTENT的消息最多投遞一次,而標記爲PERSISTENT的消息將使用暫存後再轉送的機理投遞。若是一個JMS服務離線,那麼持久性消息不會丟失可是得等到這個服務恢復聯機時纔會被傳遞。因此默認的消息傳遞方式是非持久性的。即便使用非持久性消息可能下降內務和須要的存儲器,而且這種傳遞方式只有當你不須要接收全部的消息時才使用。

雖然JMS規範並不須要JMS供應商實現消息的優先級路線,可是它須要遞送加快的消息優先於普通級別的消息。JMS定義了從09的優先級路線級別,0是最低的優先級而9則是最高的。更特殊的是04是正常優先級的變化幅度,而59是加快的優先級的變化幅度。舉例來講: topicPublisher.publish (message, DeliveryMode.PERSISTENT, 8,10000); //Pub-Sub queueSender.send(message,DeliveryMode.PERSISTENT, 8, 10000);//P2P  這個代碼片段,有兩種消息模型,映射遞送方式是持久的,優先級爲加快型,生存週期是10000 (以毫秒度量)若是生存週期設置爲零,這則消息將永遠不會過時。當消息須要時間限制不然將使其無效時,設置生存週期是有用的。

 

JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。

· StreamMessage -- Java原始值的數據流

· MapMessage--一套名稱-值對

· TextMessage--一個字符串對象

· ObjectMessage--一個序列化的 Java對象

· BytesMessage--一個未解釋字節的數據流

 

17應用程序

ConnectionFactory接口(鏈接工廠)

用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。

 

Connection 接口(鏈接)

鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。

 

Destination 接口(目標)

目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。

 

Session 接口(會話)

表示一個單線程的上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶可使用回滾操做取消這些消息。一個會話容許用戶建立消息,生產者來發送消息,消費者來接收消息。

 

MessageConsumer 接口(消息消費者)

由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或(非阻塞)接收隊列和主題類型的消息。

 

MessageProducer 接口(消息生產者)

由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。

 

Message 接口(消息)

是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另外一個應用程序。一個消息有三個主要部分:

消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。

一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。

一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。

 

消息接口很是靈活,並提供了許多方式來定製消息的內容。

 

 

2、ActiveMq

21簡介 

22操做


wKioL1iyvaOC0eBOAAClOd6EGg0296.jpg

wKioL1iyvaTwy8dbAACAHu9Zs5U584.jpg


23java代碼操做

wKioL1iyvc3y8viKAAB-J6UAqNs285.jpg


 

231代碼建立順序

 

第一步,創建ConnectionFactory工廠對象,須要填入用戶名、密碼、以及要鏈接的地址,均使用默認便可,默認端口爲」tcp://localhost:61616」。

第二步,經過ConnectionFactory工廠對象建立一個Connection鏈接,而且調用Connection的start方法開啓鏈接,Connection默認是關閉的。

第三步,經過Connection對象建立Session會話(上下文環境對象),用於接收消息,參數配置1爲是否啓用事物,參數配置2爲簽收模式,通常設置爲自動簽收。

第四步,經過Session建立Destination對象,指的是一個客戶端用來指定生產消息目標和消費消息來源的對象,在PTP模式中,Destination被稱做Queue,即隊列;在PUB/SUB模式,Destination被稱做Topic,即主題,在程序中可以使用多個Queue和Topic。

第五步,須要經過session對象建立消息的發送和接收對象(生產者和消費者)MessageProducer/MessageConsumer。

第六步,在生產端,可使用MessageProducer的setDeliveryMode方法爲其設置持久化特性和非持久化特性(DeliveryMode)。

第七步,使用JMS規範的TextMessage形式建立數據(經過session對象),並用MessageProducer的send方法發送數據。同理客戶端使用receive方法進行接收數據。最後不要忘記關閉connection鏈接。

 

 

232send

package org.mbox.test01;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Send {
 
    public staticvoid main(String[] args) throws JMSException {
        ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        Connectionconnection = connectionFactory.createConnection();
        connection.start();
        Sessionsession = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destinationdesination = session.createQueue("queue1");
        MessageProducermessageProducer = session.createProducer(desination);
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (int i =0; i < 5; i++) {
            TextMessagetextMessage = session.createTextMessage();
            textMessage.setText("發送消息內容,第"+i+"條。");
            messageProducer.send(textMessage);
        }
        if(connection!= null){
            connection.close();
        }
    }
}


 

wKiom1iyvfvCzpClAACk63ToqNg904.jpg


 

233Receive

package org.mbox.test01;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Receive {
 
    public staticvoid main(String[] args) throws JMSException {
        
        ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        Connectionconnection = connectionFactory.createConnection();
        connection.start();
        Sessionsession = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destinationdestination = session.createQueue("queue1");
        MessageConsumermessageConsumer = session.createConsumer(destination);
        while(true){
            //此種receive是阻塞的,直到接收到消息才往下執行
            TextMessagemsg = (TextMessage) messageConsumer.receive();
            if(msg== null){
                break;
            }
            System.out.println("接收到的內容:"+msg.getText());
        }
        if(connection!= null){
            connection.close();
        }
        
    }
}



wKiom1iyvi7wvJtYAAAdqSnwVOc863.jpg


再點queue1就是空的了。

再次運行消費端,就等因而兩個消費者了。


 

24activemq的安全機制

 

設置安全機制,只有符合認證的用戶才能進行發送和獲取消息,需在activemq.xml裏添加安全驗證配置!


wKiom1iyvl-jeYm1AAAvabvh-Vs254.jpg


改造新加內容爲

<plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUserusername="lly" password="lly"groups="users,admins"/>
                    <authenticationUserusername="test" password="test"groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>
 
重啓activemq服務,而且修改發送端和接收端的代碼分別爲,用戶名能夠不一致:
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(
                "lly",
                "lly",
                "tcp://localhost:61616");
ConnectionFactory connectionFactory = newActiveMQConnectionFactory(
                "test",
                "test",
                "tcp://localhost:61616");




25connection方法使用

         在成功建立正確的connectionFactory以後,下一步是建立一個鏈接,它是JMS定義的一個接口。ConnectionFactory負責返回能夠與底層消息傳遞系統進行通訊的Connection實現。一般客戶端只使用單一鏈接。根據JMS文檔,connection的目的是「利用JMS提供者封裝開放的鏈接」,以及表示「客戶端與提供者服務例程之間的開放TCP/IP套接字」。該文檔還指出connection應該是進行客戶端身份驗證的地方等等。

         當一個connection被建立時,它的傳輸默認是關閉的,必須使用start方法開啓。一個connection能夠創建一個或多個session

         當一個程序執行完成後,必須關閉以前建立的connection,不然activemq不能釋放資源,關閉一個connection一樣也關閉了sessionMessageProducerMessageCustomer

 

 

26session方法的使用

         一旦從ConnectionFactory中得到一個connection,必須從connection中建立一個或多個sessionSession是一個發送或接收消息的線程,可使用session建立MessageProducerMessageCustomerMessage

         Session能夠被事物化,也能夠不被事物化,一般,能夠經過向connection上的適當方法傳遞一個布爾參數對此進行設置。

SessioncreateSession(boolean transacted, int acknowledgeMode);

         其中transacted是使用事物標識,acknowledgeMode爲簽收模式。

         結束事物有兩種方法:提交或者回滾。當一個事物提交,消息被處理。若是事物中有一個步驟失敗,事物就回滾,這個事物中的已經執行的動做將被撤銷。在發送消息最後也必需要使用session.commit()方法表示提交事物。

         簽收模式有三種形式:

         Session.AUTO_ACKNOWLEDGE,當客戶端從receiveonmessage成功返回時,session自動簽收客戶端的這條信息的收條。

Session.CLIENT_ACKNOWLEDGE,客戶端經過調用消息(Message)的acknowledgeMode方法接收消息。在這種狀況下,簽收發生在session層面:簽收一個已消費的消息會自動的簽收這個session全部已消費消息的收條。

Session.DUPS_OK_ACKNOWLEDGE,指示session沒必要確保對傳送消息的簽收。它可能引發消息的重複,可是下降了session的開銷,因此只有客戶端能容忍重複的消息,纔可以使用。


對上述三種形式的理解:

         Session.AUTO_ACKNOWLEDGEprocedure產生了一個消息發送給MQ制定隊列queue1中,customerMQ取消息,鏈接上之後,若是MQ有數據,經過TCP把數據返回給customer,一旦customer接收到之後,會自動再次向MQ發送確認消息(自動簽收機制,不用寫代碼給其確認收到消息),可是這條確認收到的信息是看不到的。

         Session.CLIENT_ACKNOWLEDGEcustomer接收到MQ消息之後,手工調用knowledge方法給MQ發送確認消息,customer已經接收到消息了。

         Session.DUPS_OK_ACKNOWLEDGE:沒有簽收機制,可能會有消息重複,可是下降了session的開銷。

 

261測試開啓sessionAUTO_ACKNOWLEDGE

wKioL1iyvvXzsvzcAACSJREBG3Q020.jpg


wKiom1iyvyGBxpmLAADEVLOjd2I720.jpg


wKioL1iyv0biQ9vlAADGd6Sqh0I322.jpg



 

262測試開啓sessionCLIENT_ACKNOWLEDGE(推薦)

wKiom1iyv4TT9vilAACSuxbFrPk904.jpg

wKioL1iyv6LAP2eRAAE6iTQDKAg301.jpg


wKiom1iyv8PCCUEgAADTwQvC2W0665.jpg


wKiom1iyv-DgT5OuAAEdWL8-j5k125.jpg


 

263測試開啓sessionDUPS_OK_ACKNOWLEDGE

wKioL1iywECh5MCcAACwMYzfrBA018.jpg

wKiom1iywEHykxHzAACIXsRCCkk878.jpg



27MessageProducer


wKiom1iywJaAbo_0AAFQOsHddIA512.jpg

wKioL1iywJbQAq9dAADRr30lQJw816.jpg


 

271測試優先級第一部分(不開啓事物、不持久化)

wKioL1iywL-w4Zz0AAD8t9fMXeE565.jpg




 

272測試優先級第一部分(開啓事物、持久化)

wKioL1iywOuw9MdaAADuEmDZsGY647.jpg


 

273測試優先級第一部分:上述兩者測試結果

並無按照優先級獲取到消息。


wKiom1iywRSwKrYyAABBkEM10bk369.jpg


 

274測試優先級第二部分

wKiom1iywU_SZKFrAACqCQsA29s654.jpg

wKiom1iywVCwJwAjAAExHgzhN70264.jpg



 

28MessageCustomerMessage


wKioL1iywaOiHcD0AAE_Lf7TOFk669.jpg


wKiom1iywcGT4GTHAAC9bTWQLI8763.jpg



 

281不設置過濾器

2811未處理前的控制器

wKioL1iyweOCp286AAAhO19X-_Q850.jpg



 

2812生產者

 

package org.mbox.test03;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Producer {
 
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageProducer messageProceducer;
    
    publicProducer(){
        
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProceducer= this.session.createProducer(null);
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
    
    public voidsend1(){
        try {
            Destinationdestination = this.session.createQueue("first");
            MapMessagemsg1 = this.session.createMapMessage();
            //下述的Property屬性是針對消費端的過濾器的
            msg1.setString("name","張三");
            msg1.setString("age","21");
//          msg1.setStringProperty("color","blue");
//          msg1.setIntProperty("sal",1000);
//          int id =1;
//          msg1.setInt("id",id);
//          Stringreceiver = id%2 == 0 ? "A":"B";
//          msg1.setStringProperty("receiver",receiver);
            
            MapMessagemsg2 = this.session.createMapMessage();
            msg2.setString("name","李四");
            msg2.setString("age","22");
//          msg2.setStringProperty("color","blue");
//          msg2.setIntProperty("sal",1200);
//          id = 2;
//          msg2.setInt("id",id);
//          receiver= id%2 == 0 ? "A":"B";
//          msg2.setStringProperty("receiver",receiver);
            
            MapMessagemsg3 = this.session.createMapMessage();
            msg3.setString("name","王五");
            msg3.setString("age","23");
//          msg3.setStringProperty("color","blue");
//          msg3.setIntProperty("sal",1300);
//          id = 3;
//          msg3.setInt("id",id);
//          receiver= id%2 == 0 ? "A":"B";
//          msg3.setStringProperty("receiver",receiver);
            
            MapMessagemsg4 = this.session.createMapMessage();
            msg4.setString("name","趙六");
            msg4.setString("age","24");
//          msg4.setStringProperty("color","oragne");
//          msg4.setIntProperty("sal",1400);
//          id = 4;
//          msg4.setInt("id",id);
//          receiver= id%2 == 0 ? "A":"B";
//          msg4.setStringProperty("receiver",receiver);
            
            this.messageProceducer.send(destination,msg1, DeliveryMode.NON_PERSISTENT, 2, 1000*60*10L);
            this.messageProceducer.send(destination,msg2, DeliveryMode.NON_PERSISTENT, 4, 1000*60*10L);
            this.messageProceducer.send(destination,msg3, DeliveryMode.NON_PERSISTENT, 6, 1000*60*10L);
            this.messageProceducer.send(destination,msg4, DeliveryMode.NON_PERSISTENT, 8, 1000*60*10L);
            
        } catch (JMSExceptione) {
            e.printStackTrace();
        }
        
    }
    
    public voidsend2(){
        try {
            Destinationdestination = this.session.createQueue("first");
            TextMessagemsg = this.session.createTextMessage("我是一個字符串內容");
            this.messageProceducer.send(destination,msg, DeliveryMode.NON_PERSISTENT, 9, 1000*60*10L);
            
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
    
    public staticvoid main(String[] args) {
        
        Producerproducer = new Producer();
        producer.send1();
        
    }
    
    
}


 

wKioL1iywgOx8tSVAAAkug4YbuU935.jpg



 

2813消費者

 

package org.mbox.test03;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Customer {
 
    public finalString SELECTOR_1 = "color = 'blue'";
    public finalString SELECTOR_2 = "color = 'blue' and sal > 1000";
    public finalString SELECTOR_3 = "receiver = 'A' ";
    
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageConsumer messageConsumer;
    privateDestination destination;
    
    publicCustomer(){
        
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination= this.session.createQueue("first");
            //不設置過濾器
            this.messageConsumer= this.session.createConsumer(destination);
            //設置過濾器
//          this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2);
            this.receiver();
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
 
    public voidreceiver(){
        try {
            this.messageConsumer.setMessageListener(newListener());
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
    
    class Listenerimplements MessageListener{
 
        //自動調用onMessage方法
        @Override
        public voidonMessage(Message message) {
            try {
                //判斷數據類型
                if(messageinstanceof MapMessage){
                    MapMessagemsg = (MapMessage)message;
                    System.out.println(msg.toString());
                    System.out.println(msg.getString("name"));
                    System.out.println(msg.getString("age"));
                }
                if(messageinstanceof TextMessage){
                    
                }
            
            } catch(JMSException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    public staticvoid main(String[] args) {
        newCustomer();
    }
    
    
}


 

#執行結果

ActiveMQMapMessage {commandId = 5, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:1,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591920, timestamp = 1483464991920,arrival = 0, brokerInTime = 1483464991924, brokerOutTime = 1483465021382, correlationId= null, replyTo = null, persistent = false, type = null, priority = 2, groupID= null, groupSequence = 0, targetConsumerId = null, compressed = false, userID= null, content = org.apache.activemq.util.ByteSequence@2a4640,marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

張三

21

ActiveMQMapMessage {commandId = 6, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:2,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991924, brokerOutTime = 1483465021383,correlationId = null, replyTo = null, persistent = false, type = null, priority= 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@19b35853,marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

李四

22

ActiveMQMapMessage {commandId = 7, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:3,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991925, brokerOutTime = 1483465021384,correlationId = null, replyTo = null, persistent = false, type = null, priority= 6, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@4d6bbe53,marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

王五

23

ActiveMQMapMessage {commandId = 8, responseRequired =false, messageId = ID:XT-201605212138-56907-1483464991686-1:1:1:1:4,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56907-1483464991686-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465591922, timestamp = 1483464991922,arrival = 0, brokerInTime = 1483464991925, brokerOutTime = 1483465021384,correlationId = null, replyTo = null, persistent = false, type = null, priority= 8, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@5c9c62da,marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size= 0, properties = null, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

趙六

24

 

wKiom1iywinipZLHAAAr__K-tZk928.jpg


 

282設置過濾器

2821未處理前的控制器


wKiom1iywljic4W3AAAhQfLM86A842.jpg

 

2822生產者

 

package org.mbox.test03;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Producer {
 
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageProducer messageProceducer;
    
    publicProducer(){
        
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProceducer= this.session.createProducer(null);
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
    
    public voidsend1(){
        try {
            Destinationdestination = this.session.createQueue("first");
            MapMessagemsg1 = this.session.createMapMessage();
            //下述的Property屬性是針對消費端的過濾器的
            msg1.setString("name","張三");
            msg1.setString("age","21");
            msg1.setStringProperty("color","blue");
            msg1.setIntProperty("sal",1000);
            int id =1;
            msg1.setInt("id",id);
            Stringreceiver = id%2 == 0 ? "A":"B";
            msg1.setStringProperty("receiver",receiver);
            
            MapMessagemsg2 = this.session.createMapMessage();
            msg2.setString("name","李四");
            msg2.setString("age","22");
            msg2.setStringProperty("color","blue");
            msg2.setIntProperty("sal",1200);
            id = 2;
            msg2.setInt("id",id);
            receiver= id%2 == 0 ? "A":"B";
            msg2.setStringProperty("receiver",receiver);
            
            MapMessagemsg3 = this.session.createMapMessage();
            msg3.setString("name","王五");
            msg3.setString("age","23");
            msg3.setStringProperty("color","blue");
            msg3.setIntProperty("sal",1300);
            id = 3;
            msg3.setInt("id",id);
            receiver= id%2 == 0 ? "A":"B";
            msg3.setStringProperty("receiver",receiver);
            
            MapMessagemsg4 = this.session.createMapMessage();
            msg4.setString("name","趙六");
            msg4.setString("age","24");
            msg4.setStringProperty("color","oragne");
            msg4.setIntProperty("sal",1400);
            id = 4;
            msg4.setInt("id",id);
            receiver= id%2 == 0 ? "A":"B";
            msg4.setStringProperty("receiver",receiver);
            
            this.messageProceducer.send(destination,msg1, DeliveryMode.NON_PERSISTENT, 2, 1000*60*10L);
            this.messageProceducer.send(destination,msg2, DeliveryMode.NON_PERSISTENT, 4, 1000*60*10L);
            this.messageProceducer.send(destination,msg3, DeliveryMode.NON_PERSISTENT, 6, 1000*60*10L);
            this.messageProceducer.send(destination,msg4, DeliveryMode.NON_PERSISTENT, 8, 1000*60*10L);
            
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
    
    public voidsend2(){
        try {
            Destinationdestination = this.session.createQueue("first");
            TextMessagemsg = this.session.createTextMessage("我是一個字符串內容");
            this.messageProceducer.send(destination,msg, DeliveryMode.NON_PERSISTENT, 9, 1000*60*10L);
            
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
    
    public staticvoid main(String[] args) {
        
        Producerproducer = new Producer();
        producer.send1();
        
    }
    
    
}


 

wKiom1iywn2zwsn_AAAkMOHanY4713.jpg

 

2823消費者

 

package org.mbox.test03;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Customer {
 
    public finalString SELECTOR_1 = "color = 'blue'";
    public finalString SELECTOR_2 = "color = 'blue' and sal > 1000";
    public finalString SELECTOR_3 = "receiver = 'A' ";
    
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageConsumer messageConsumer;
    privateDestination destination;
    
    publicCustomer(){
        
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination= this.session.createQueue("first");
//          //不設置過濾器
//          this.messageConsumer= this.session.createConsumer(destination);
            //設置過濾器,過濾性能不肯定,因此推薦用ROCKETMQ
            this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2);
            this.receiver();
        } catch (JMSExceptione) {
            e.printStackTrace();
        }
        
    }
 
    public voidreceiver(){
        try {
            this.messageConsumer.setMessageListener(newListener());
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
    
    class Listenerimplements MessageListener{
 
        //自動調用onMessage方法
        @Override
        public voidonMessage(Message message) {
            try {
                //判斷數據類型
                if(messageinstanceof MapMessage){
                    MapMessagemsg = (MapMessage)message;
                    System.out.println(msg.toString());
                    System.out.println(msg.getString("name"));
                    System.out.println(msg.getString("age"));
                }
                if(messageinstanceof TextMessage){
                    
                }
            
            } catch(JMSException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    public staticvoid main(String[] args) {
        newCustomer();
    }
    
    
}


 

#控制檯輸出結果

ActiveMQMapMessage {commandId = 6, responseRequired =false, messageId = ID:XT-201605212138-56953-1483465211306-1:1:1:1:2,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56953-1483465211306-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465811545, timestamp = 1483465211545,arrival = 0, brokerInTime = 1483465211547, brokerOutTime = 1483465242258,correlationId = null, replyTo = null, persistent = false, type = null, priority= 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@dbca6b5,marshalledProperties = org.apache.activemq.util.ByteSequence@5c64bc1e,dataStructure = null, redeliveryCounter = 0, size = 0, properties ={receiver=A, color=blue, sal=1200}, readOnlyProperties = true, readOnlyBody =true, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

李四

22

ActiveMQMapMessage {commandId = 7, responseRequired =false, messageId = ID:XT-201605212138-56953-1483465211306-1:1:1:1:3,originalDestination = null, originalTransactionId = null, producerId =ID:XT-201605212138-56953-1483465211306-1:1:1:1, destination = queue://first,transactionId = null, expiration = 1483465811545, timestamp = 1483465211545,arrival = 0, brokerInTime = 1483465211547, brokerOutTime = 1483465242261,correlationId = null, replyTo = null, persistent = false, type = null, priority= 6, groupID = null, groupSequence = 0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@3f38dd0,marshalledProperties = org.apache.activemq.util.ByteSequence@6ae354b6,dataStructure = null, redeliveryCounter = 0, size = 0, properties = {receiver=B,color=blue, sal=1300}, readOnlyProperties = true, readOnlyBody = true,droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{theTable = {} }

王五

23

 

wKiom1iywqaBrFIUAAAsxo8YKss779.jpg


 

29持久化mysql

wKiom1iywu7gHOe3AACujklgsW4717.jpg


wKioL1iywwyjpfIYAACIV3wrd8s025.jpg


 

#運行代碼
package org.mbox.test01;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Send {
 
    public staticvoid main(String[] args) throws JMSException {
//      ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
//              ActiveMQConnectionFactory.DEFAULT_USER,
//              ActiveMQConnectionFactory.DEFAULT_PASSWORD,
//              "tcp://localhost:61616");
        ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(
                "lly",
                "lly",
                "tcp://localhost:61616");
        Connectionconnection = connectionFactory.createConnection();
        connection.start();
        Sessionsession = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        Destinationdesination = session.createQueue("queue1");
        MessageProducermessageProducer = session.createProducer(desination);
//      messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//      for (int i =0; i < 5; i++) {
            TextMessagetextMessage = session.createTextMessage();
            textMessage.setText("發送消息內容,第1條。優先級設置1");
            //第一個參數目的地
            //第二個參數消息
            //第三個參數是否持久化
            //第四個參數優先級
            //第五個參數消息在MQ上的存放有效期
            messageProducer.send(desination,textMessage, DeliveryMode.PERSISTENT, 1 , 1000*60*2);
//      }
        
        TextMessagetextMessage2 = session.createTextMessage();
        textMessage2.setText("發送消息內容,第2條。優先級設置9");
        messageProducer.send(desination,textMessage2, DeliveryMode.PERSISTENT, 9 , 1000*60*2);
        
        TextMessagetextMessage3 = session.createTextMessage();
        textMessage3.setText("發送消息內容,第3條。優先級設置6");
        messageProducer.send(desination,textMessage3, DeliveryMode.PERSISTENT, 6 , 1000*60*2);
        
        TextMessagetextMessage4 = session.createTextMessage();
        textMessage4.setText("發送消息內容,第4條。優先級設置8");
        messageProducer.send(desination,textMessage4, DeliveryMode.PERSISTENT, 8 , 1000*60*2);
        
        TextMessagetextMessage5 = session.createTextMessage();
        textMessage5.setText("發送消息內容,第5條。優先級設置3");
        messageProducer.send(desination,textMessage5, DeliveryMode.PERSISTENT, 3 , 1000*60*2);
        session.commit();
        
        if(connection!= null){
            connection.close();
        }
    }
}


 

wKioL1iywzbDszoNAACcwVcTZZg880.jpg


wKiom1iyw27hyUdQAABWqpJHMEU298.jpg

wKioL1iyw27SWcycAABleru0iY8974.jpg


wKiom1iyw5GB8obEAACHb5s803Q095.jpg



 

2113例子
21131發佈端

 

package org.mbox.test04;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Publish {
 
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageProducer messageProducer;
    
    publicPublish(){
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProducer= this.session.createProducer(null);
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
    
    public voidsendMessage(){
        try {
            Destinationdestination = this.session.createTopic("topic1");
            TextMessagetextMessage = this.session.createTextMessage("我是內容");
            this.messageProducer.send(destination,textMessage);
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
    
    public staticvoid main(String[] args) {
        Publishpublish = new Publish();
        publish.sendMessage();
    }
    
}


 

 

21132消費端

 

package org.mbox.test03;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class Customer {
 
    public finalString SELECTOR_1 = "color = 'blue'";
    public finalString SELECTOR_2 = "color = 'blue' and sal > 1000";
    public finalString SELECTOR_3 = "receiver = 'A' ";
    
    privateConnectionFactory connectionFactory;
    privateConnection connection;
    private Sessionsession;
    privateMessageConsumer messageConsumer;
    privateDestination destination;
    
    publicCustomer(){
        
        try {
            this.connectionFactory= new ActiveMQConnectionFactory("lly", "lly","tcp://localhost:61616");
            this.connection= this.connectionFactory.createConnection();
            this.connection.start();
            this.session= this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination= this.session.createQueue("first");
//          //不設置過濾器
//          this.messageConsumer= this.session.createConsumer(destination);
            //設置過濾器,過濾性能不肯定,因此推薦用ROCKETMQ
            this.messageConsumer= this.session.createConsumer(destination,SELECTOR_2);
            this.receiver();
        } catch(JMSException e) {
            e.printStackTrace();
        }
        
    }
 
    public voidreceiver(){
        try {
            this.messageConsumer.setMessageListener(newListener());
        } catch(JMSException e) {
            e.printStackTrace();
        }
    }
    
    class Listenerimplements MessageListener{
 
        //自動調用onMessage方法
        @Override
        public void onMessage(Messagemessage) {
            try {
                //判斷數據類型
                if(messageinstanceof MapMessage){
                    MapMessagemsg = (MapMessage)message;
                    System.out.println(msg.toString());
                    System.out.println(msg.getString("name"));
                    System.out.println(msg.getString("age"));
                }
                if(messageinstanceof TextMessage){
                    
                }
            
            } catch(JMSException e) {
                e.printStackTrace();
            }
        }
        
    }
    
    public staticvoid main(String[] args) {
        newCustomer();
    }
    
}


 

wKioL1iyw7OAdAQbAAA90Y56DHM825.jpg

相關文章
相關標籤/搜索