熟悉activemq的初步試用

 

1.在服務器(阿里雲ubuntu16.04)上安裝activemq,我是直接下載activemq:java

wget http://archive.apache.org/dist/activemq/apache-activemq/5.6.0/apache-activemq-5.6.0-bin.tar.gz

2.解壓及安裝,能夠經過activemq  --help  查看一些命令及參數信息;apache

3.啓動activemq;ubuntu

4.編寫簡單的java demo:服務器

 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by IntelliJ IDEA.
 *
 * @author 
 *         Description:
 *         Date: 2017/11/26
 *         Time: 12:07
 */
public class Producter {
    /**
     * ActiveMq 的默認用戶名
     */
    private static final String USERNAME = "xx";
    /**
     * ActiveMq 的默認登陸密碼
     */
    private static final String PASSWORD = "xx";
    /**
     * ActiveMQ 的連接地址
     */
    private static final String BROKEN_URL = "xx";

    AtomicInteger count = new AtomicInteger(0);
    /**
     * 連接工廠
     */
    ConnectionFactory connectionFactory;

    /**
     * 連接對象
     */
    Connection connection;

    /**
     * 事務管理
     */
    Session session;

    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){
        try {
            //建立一個連接工廠
//            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://xx");
            //從工廠中建立一個連接
            connection  = connectionFactory.createConnection();
            //開啓連接
            connection.start();
            //建立一個事務(這裏經過參數能夠設置事務的級別)
            session = connection.createSession(true,Session.SESSION_TRANSACTED);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void sendMessage(String disname){
        try {
            //建立一個消息隊列
            Queue queue = session.createQueue(disname);
            //消息生產者
            MessageProducer messageProducer = null;
            if(threadLocal.get()!=null){
                messageProducer = threadLocal.get();
            }else{
                messageProducer = session.createProducer(queue);
                threadLocal.set(messageProducer);
            }
            while(true){
                Thread.sleep(1000);
                int num = count.getAndIncrement();
                //建立一條消息
                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                        "productor:生產者!,count:"+num);
                System.out.println(Thread.currentThread().getName()+
                        "productor:生產東西!,count:"+num);
                //發送消息
                messageProducer.send(msg);
                //提交事務
                session.commit();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by IntelliJ IDEA.
 *
 * @author
 *         Description:
 *         Date: 2017/11/26
 *         Time: 12:08
 */
public class Comsumer {

    /**
     * ActiveMq 的默認用戶名
     */
    private static final String USERNAME = "xx";
    /**
     * ActiveMq 的默認登陸密碼
     */
    private static final String PASSWORD = "xx";
    /**
     * ActiveMQ 的連接地址
     */
    private static final String BROKEN_URL = "xx";

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    AtomicInteger count = new AtomicInteger();

    public void init(){
        try {
            connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://xx");
            connection  = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }


    public void getMessage(String disname){
        try {
            Queue queue = session.createQueue(disname);
            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){
                consumer = threadLocal.get();
            }else{
                consumer = session.createConsumer(queue);
                threadLocal.set(consumer);
            }
            while(true){
                Thread.sleep(1000);
                TextMessage msg = (TextMessage) consumer.receive();
                if(msg!=null) {
                    msg.acknowledge();
                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());
                }else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  

 

test:session

/**
 * Created by IntelliJ IDEA.
 *
 * @author 
 *         Description:
 *         Date: 2017/11/26
 *         Time: 12:48
 */
public class TestConsumer {
    public static void main(String[] args){
        Comsumer comsumer = new Comsumer();
        comsumer.init();
        TestConsumer testConsumer = new TestConsumer();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        new Thread(testConsumer.new ConsumerMq(comsumer)).start();
    }

    private class ConsumerMq implements Runnable{
        Comsumer comsumer;
        public ConsumerMq(Comsumer comsumer){
            this.comsumer = comsumer;
        }

        @Override
        public void run() {
            while(true){
                try {
                    comsumer.getMessage("zq-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

/**
 * Created by IntelliJ IDEA.
 *
 * @author 
 *         Description:
 *         Date: 2017/11/26
 *         Time: 12:17
 */
public class TestMq {
    public static void main(String[] args){
        Producter producter = new Producter();
        producter.init();
        TestMq testMq = new TestMq();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //Thread 1
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 2
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 3
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 4
        new Thread(testMq.new ProductorMq(producter)).start();
        //Thread 5
        new Thread(testMq.new ProductorMq(producter)).start();
    }

    private class ProductorMq implements Runnable{
        Producter producter;
        public ProductorMq(Producter producter){
            this.producter = producter;
        }

        @Override
        public void run() {
            while(true){
                try {
                    producter.sendMessage("zq-MQ");
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  

 

 

以上僅僅是一個簡單的demo,實際生產環境種使用,需要考慮使用場景,配置activemq的配置文件,以及與項目的整合問題。tcp

相關文章
相關標籤/搜索