ActiveMQ簡述,使用

官網地址:http://activemq.apache.org/java

參考文章:http://my.oschina.net/nk2011/blog/366395linux

JMS支持兩種消息發送和接收模型。一種稱爲P2P(Ponit to Point)模型,即採用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱爲可能,P2P模型在點對點的狀況下進行消息傳遞時採用。web

另外一種稱爲Pub/Sub(Publish/Subscribe,即發佈-訂閱)模型,發佈-訂閱模型定義瞭如何向一個內容節點發布和訂閱消息,這個內容節點稱爲topic(主題)。主題能夠認爲是消息傳遞的中介,消息發佈這將消息發佈到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發佈者互相保持獨立,不須要進行接觸便可保證消息的傳遞,發佈-訂閱模型在消息的一對多廣播時採用。apache

ActiveMQ的安裝瀏覽器

下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對linux系統進行闡述,固然ActiveMQ也有win版的,這裏就不贅述了),能夠去官網下載,也能夠在下方留言區留下你的郵箱,博主會發給你的~服務器

下載以後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gzsession

ActiveMQ目錄內容有:app

  • bin目錄包含ActiveMQ的啓動腳本less

  • conf目錄包含ActiveMQ的全部配置文件webapp

  • data目錄包含日誌文件和持久性消息數據

  • example: ActiveMQ的示例

  • lib: ActiveMQ運行所須要的lib

  • webapps: ActiveMQ的web控制檯和一些相關的demo

運行命令:activemq start(在activemq/bin下運行)

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')

查看activemq是否運行命令:ps -aux | grep activemq

shr        986  1.2  9.7 1281720 201936 pts/5  Sl   19:43   0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start
shr       1501  0.0  0.0   5176   724 pts/5    S+   20:06   0:00 grep activemq

關閉命令: activemq stop

INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env'
INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java'
INFO: Waiting at least 30 seconds for regular process termination of pid '986' :
Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre
  Heap sizes: current=63232k  free=62218k  max=932096k
    JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data
Extensions classpath:
  [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra]
ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2
ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf
ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data
Connecting to pid: 986
..Stopping broker: localhost
.. TERMINATED

ActiveMQ的默認服務端口爲61616,這個能夠在conf/activemq.xml配置文件中修改:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

案例

在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個jar包:activemq-all-5.13.2.jar,引入這個jar到你的項目中便可開始編寫案例代碼。

博主的activemq服務器地址爲10.10.195.187,這個在下面代碼中會有體現。

按照JMS的規範,咱們首先須要得到一個JMS connection factory.,經過這個connection factory來建立connection.在這個基礎之上咱們再建立session, destination, producer和consumer。所以主要的幾個步驟以下:

  1. 得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。

  2. 利用factory構造JMS connection

  3. 啓動connection

  4. 經過connection建立JMS session.

  5. 指定JMS destination.

  6. 建立JMS producer或者建立JMS message並提供destination.

  7. 建立JMS consumer或註冊JMS message listener.

  8. 發送和接收JMS message.

  9. 關閉全部JMS資源,包括connection, session, producer, consumer等。

下面來看代碼舉例(P2P式)。

經過Java實現的基於ActiveMQ的請求提交:

package com.zzh.activemq;
 
import java.io.Serializable;
import java.util.HashMap;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class RequestSubmit
{
    //消息發送者
    private MessageProducer producer;
    //一個發送或者接受消息的線程
    private Session session;
 
    public void init() throws Exception
    {
        //ConnectionFactory鏈接工廠,JMS用它建立鏈接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //Connection:JMS客戶端到JMS Provider的鏈接,從構造工廠中獲得鏈接對象
        Connection connection = connectionFactory.createConnection();
        //啓動
        connection.start();
        //獲取鏈接操做
        session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination destinatin = session.createQueue("RequestQueue");
        //獲得消息生成(發送)者
        producer = session.createProducer(destinatin);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
 
    public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        ObjectMessage message = session.createObjectMessage(requestParam);
        producer.send(message);
        session.commit();
    }
 
    public static void main(String[] args) throws Exception{
        RequestSubmit submit = new RequestSubmit();
        submit.init();
        HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>();
        requestParam.put("郟高陽", "zzh");
        submit.submit(requestParam);
    }
}

建立Session時有兩個很是重要的參數,第一個boolean類型的參數用來表示是否採用事務消息。若是是事務消息,對於的參數設置爲true,此時消息的提交自動有comit處理,消息的回滾則自動由rollback處理。加入消息不是事務的,則對應的該參數設置爲false,此時分爲三種狀況:

  • Session.AUTO_ACKNOWLEDGE表示Session會自動確認所接收到的消息。

  • Session.CLIENT_ACKNOWLEDGE表示由客戶端程序經過調用消息的確認方法來確認所接收到的消息。

  • Session.DUPS_OK_ACKNOWLEDGE使得Session將「懶惰」地確認消息,即不會當即確認消息,這樣有可能致使消息重複投遞。

提供Java實現的基於ActiveMQ的請求處理:

package com.zzh.activemq;
 
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
 
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class RequestProcessor
{
    public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception
    {
        System.out.println("requestHandler....."+requestParam.toString());
        for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet())
        {
            System.out.println(entry.getKey()+":"+entry.getValue());
        }
    }
 
    public static void main(String[] args) throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("RequestQueue");
        //消息消費(接收)者
        MessageConsumer consumer = session.createConsumer(destination);
 
        RequestProcessor processor = new RequestProcessor();
 
        while(true)
        {
            ObjectMessage message = (ObjectMessage) consumer.receive(1000);
            if(null != message)
            {
                System.out.println(message);
                HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject();
                processor.requestHandler(requestParam);
            }
            else
            {
                break;
            }
        }
    }
}

輸出結果:

ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
requestHandler.....{郟高陽=zzh}
郟高陽:zzh

能夠經過頁面查看隊列的使用狀況,在瀏覽器中輸入http://10.10.195.187:8161/admin/queues.jsp,用戶名和密碼都是:admin,看到如下頁面:

這個是在jetty服務器下跑的,能夠修改conf/jetty.xml來修改相關jetty配置。

上面的例子是關於P2P模式的,不過有個不妥之處,就是沒有資源的釋放。下面舉一個Pub/Sub模式的。

經過JMS建立ActiveMQ的topic,並給topic發送消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.Produce;
 
public class TopicRequest
{
    //消息發送者
    private MessageProducer producer;
    //一個發送或者接受消息的線程
    private Session session;
    //Connection:JMS客戶端到JMS Provider的鏈接
    private Connection connection;
 
    public void init() throws Exception
    {
        //ConnectionFactory鏈接工廠,JMS用它建立鏈接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        //從構造工廠中獲得鏈接對象
        connection = connectionFactory.createConnection();
        //啓動
        connection.start();
        //獲取鏈接操做
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        producer = session.createProducer(topic);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
 
    public void submit(String mess) throws Exception
    {
        TextMessage message = session.createTextMessage();
        message.setText(mess);
        producer.send(message);
    }
 
    public void close()
    {
        try
        {
            if(session != null)
                session.close();
            if(producer != null)
                producer.close();
            if(connection !=null )
                connection.close();
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws Exception
    {
        TopicRequest topicRequest = new TopicRequest();
        topicRequest.init();
        topicRequest.submit("I'm first");
        topicRequest.close();
    }
}

消息發送到對應的topic後,須要將listener註冊到須要訂閱的topic上,以便可以接收該topic的消息:

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
 
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
 
public class TopicReceive
{
    private MessageConsumer consumer;
    private Session session;
 
    public void init() throws Exception
    {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://10.10.195.187:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("MessageTopic");
        consumer = session.createConsumer(topic);
 
        consumer.setMessageListener(new MessageListener(){
            @Override
            public void onMessage(Message message)
            {
                TextMessage tm = (TextMessage) message;
                System.out.println(tm);
                try
                {
                    System.out.println(tm.getText());
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });
    }
 
    public static void main(String[] args) throws Exception
    {
        TopicReceive receive = new TopicReceive();
        receive.init();
    }
}

輸出結果:

ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first}
I'm first
相關文章
相關標籤/搜索