activemq的使用方法

activemq是Apache的一款開源消息總線,主要用來作消息的分發。java

首先須要下載MQ,進行啓動。spring

而後在控制檯建立隊列,初始用戶名密碼admin/admin。數據庫

而後能夠寫生產者、消費者進行測試了。因爲activemq支持spring,所以有兩種不一樣的寫法:apache

方法一:建立factory, connection, session, destination, producer,consumer服務器

方法二:經過配置文件進行建立(何嘗試)。session

最初在其做用的理解上有一些誤差,其實是,在發送端引入MQ的jar包,向指定的MQ服務器發送信息,MQ會自動將其添加到消息隊列中,用控制檯能夠比較清晰的看到隊列狀況:http://localhost:8161/admin/ide

在接收端循環掃描要接收的隊列,當讀取到信息時進行接收處理。測試

 

須要注意的是,mq支持持久化,可將消息持久化到本地文件、數據庫。this

另外一個須要注意的地方是,建立會話session時,第一個參數爲true時,須要向服務器確認消息的接收。不然服務器認爲沒有成功接收,引用一下其餘同窗的話:spa

createSession(paramA,paramB);

paramA 取值有 : true or false 表示是否支持事務

paramB 取值有:Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,SESSION_TRANSACTED

createSession(paramA,paramB);

paramA是設置事務的,paramB設置acknowledgment mode

paramA設置爲false時:paramB的值可爲Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。

paramA設置爲true時:paramB的值忽略, acknowledgment mode被jms服務器設置爲SESSION_TRANSACTED 。

Session.AUTO_ACKNOWLEDGE爲自動確認,客戶端發送和接收消息不須要作額外的工做。

Session.CLIENT_ACKNOWLEDGE爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的acknowledge方法。jms服務器纔會刪除消息。

DUPS_OK_ACKNOWLEDGE容許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;並且容許重複確認。在須要考慮資源使用時,這種模式很是有效。

附代碼
接收端:

package com.receiver;


import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageReceiver implements IMessageReceiver {
    
    public ActiveMQConnectionFactory connectionFactory = null;
    public Connection connection = null;
    public Session session = null;
    public Destination destination = null;
    public MessageConsumer getConsumer() {
        return consumer;
    }


    public void setConsumer(MessageConsumer consumer) {
        this.consumer = consumer;
    }

    public MessageConsumer consumer = null;
    
    //初始化,建立factory, connection, session, destination, producer
    public MessageReceiver(){                
        try {
            InputStream inProperties=MessageReceiver.class.getResourceAsStream("../config/connection.properties");
            Properties properties = new Properties();
            properties.load(inProperties);
            //建立factory
            connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                    properties.getProperty("password"),
                    properties.getProperty("brokerURL"));
            //建立connection
            connection = connectionFactory.createConnection();
            connection.start();
            //獲取操做鏈接
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //獲取消息目的地,需在控制檯配置
            destination = session.createQueue(properties.getProperty("queueName"));
            //獲得消息接收者
            consumer = session.createConsumer(destination);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


    public void ReceiveMessage(MessageConsumer consumer) {
        int i = 0;
        while(true){
            try {
                TextMessage message = (TextMessage) consumer.receive(RECEIVE_TIME);
                if(message != null){
                    System.out.println("queue1 "+message.getText()+"   "+i);
                    FileOutputStream out;
                    out = new FileOutputStream("D:/test.txt");
                    PrintStream p=new PrintStream(out);
                    p.println("queue1 "+message.getText()+"   "+i);
                    out.close();
                    
                }
                 Thread.sleep(1000);
                 i++;
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    public void CloseConnection(Connection connection) {
        if(connection != null){
            try {
                connection.close();
            } catch (JMSException e) {                
                e.printStackTrace();
            }
        }
    }
    
    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public Destination getDestination() {
        return destination;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

}
package com.receiver;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class ReceiveMain {

    /**
     * @param args
     * @throws JMSException 
     */
    public static void main(String[] args) throws JMSException {
        MessageReceiver messageReceiver = new MessageReceiver();
        messageReceiver.ReceiveMessage(messageReceiver.getConsumer());
        messageReceiver.CloseConnection(messageReceiver.getConnection());
    }

}

發送端:

package com.sender;


import java.io.InputStream;
import java.util.Properties;

import javax.jms.*;

import org.apache.activemq.*;

public class MessageSender implements IMessageSender {
    
    public ActiveMQConnectionFactory connectionFactory = null;
    public Connection connection = null;
    public Session session = null;
    public Destination destination = null;
    public MessageProducer producer = null;
    
    //初始化,建立factory, connection, session, destination, producer
    public MessageSender(){                
        try {
            InputStream inProperties=MessageSender.class.getResourceAsStream("../config/connection.properties");
            Properties properties = new Properties();
            properties.load(inProperties);
            //建立factory
            connectionFactory = new ActiveMQConnectionFactory(properties.getProperty("name"),
                    properties.getProperty("password"),
                    properties.getProperty("brokerURL"));
            //建立connection
            connection = connectionFactory.createConnection();
            connection.start();
            //獲取操做鏈接
            session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            //獲取消息目的地,需在控制檯配置
            destination = session.createQueue(properties.getProperty("queueName"));
            //獲得消息發送者
            producer = session.createProducer(destination);
            //設置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public TextMessage CreateMessage(Session session, int i) {
        String strMessage = "hello world!   "+i;
        TextMessage message = null;
        try {
            message = session.createTextMessage(strMessage);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return message;
    }

    public void SendMessage(TextMessage message, MessageProducer producer) {
        try {
            producer.send(message);
            
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    
    public void CloseConnection(Connection connection) {
        if(connection != null){
            try {
                connection.close();
            } catch (JMSException e) {                
                e.printStackTrace();
            }
        }
    }
    
    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }

    public Session getSession() {
        return session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public Destination getDestination() {
        return destination;
    }

    public void setDestination(Destination destination) {
        this.destination = destination;
    }

    public MessageProducer getProducer() {
        return producer;
    }

    public void setProducer(MessageProducer producer) {
        this.producer = producer;
    }
}
package com.sender;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class SendMain {

    /**
     * @param args
     * @throws JMSException 
     */
    public static void main(String[] args) throws JMSException {
        MessageSender messageSender = new MessageSender();
        for(int i = 0;i < 10;i++){
            TextMessage textMessage = messageSender.CreateMessage(messageSender.getSession(),i);
            messageSender.SendMessage(textMessage, messageSender.getProducer());
            System.out.println("send message sucess!  :  " + i);
            FileOutputStream out;
            try {
                out = new FileOutputStream("D:/test.txt");
                PrintStream p=new PrintStream(out);
                p.println("send message sucess!  :  " + i);
                out.close();
            } catch (FileNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
            messageSender.getSession().commit();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        messageSender.CloseConnection(messageSender.getConnection());
    }

}
相關文章
相關標籤/搜索