1.在服務器(阿里雲ubuntu16.04)上安裝activemq,我是直接下載activemq:java
wget http://archive.apache.org/dist/activemq/apache-activemq/5.6.0/apache-activemq-5.6.0-bin.tar.gz
2.解壓及安裝,能夠經過activemq --help 查看一些命令及參數信息;apache
3.啓動activemq;ubuntu
4.編寫簡單的java demo:服務器
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * Created by IntelliJ IDEA. * * @author * Description: * Date: 2017/11/26 * Time: 12:07 */ public class Producter { /** * ActiveMq 的默認用戶名 */ private static final String USERNAME = "xx"; /** * ActiveMq 的默認登陸密碼 */ private static final String PASSWORD = "xx"; /** * ActiveMQ 的連接地址 */ private static final String BROKEN_URL = "xx"; AtomicInteger count = new AtomicInteger(0); /** * 連接工廠 */ ConnectionFactory connectionFactory; /** * 連接對象 */ Connection connection; /** * 事務管理 */ Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){ try { //建立一個連接工廠 // connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://xx"); //從工廠中建立一個連接 connection = connectionFactory.createConnection(); //開啓連接 connection.start(); //建立一個事務(這裏經過參數能夠設置事務的級別) session = connection.createSession(true,Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname){ try { //建立一個消息隊列 Queue queue = session.createQueue(disname); //消息生產者 MessageProducer messageProducer = null; if(threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while(true){ Thread.sleep(1000); int num = count.getAndIncrement(); //建立一條消息 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:生產者!,count:"+num); System.out.println(Thread.currentThread().getName()+ "productor:生產東西!,count:"+num); //發送消息 messageProducer.send(msg); //提交事務 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.util.concurrent.atomic.AtomicInteger; /** * Created by IntelliJ IDEA. * * @author * Description: * Date: 2017/11/26 * Time: 12:08 */ public class Comsumer { /** * ActiveMq 的默認用戶名 */ private static final String USERNAME = "xx"; /** * ActiveMq 的默認登陸密碼 */ private static final String PASSWORD = "xx"; /** * ActiveMQ 的連接地址 */ private static final String BROKEN_URL = "xx"; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>(); AtomicInteger count = new AtomicInteger(); public void init(){ try { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://xx"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void getMessage(String disname){ try { Queue queue = session.createQueue(disname); MessageConsumer consumer = null; if(threadLocal.get()!=null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while(true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { msg.acknowledge(); System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
test:session
/** * Created by IntelliJ IDEA. * * @author * Description: * Date: 2017/11/26 * Time: 12:48 */ public class TestConsumer { public static void main(String[] args){ Comsumer comsumer = new Comsumer(); comsumer.init(); TestConsumer testConsumer = new TestConsumer(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); } private class ConsumerMq implements Runnable{ Comsumer comsumer; public ConsumerMq(Comsumer comsumer){ this.comsumer = comsumer; } @Override public void run() { while(true){ try { comsumer.getMessage("zq-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
/** * Created by IntelliJ IDEA. * * @author * Description: * Date: 2017/11/26 * Time: 12:17 */ public class TestMq { public static void main(String[] args){ Producter producter = new Producter(); producter.init(); TestMq testMq = new TestMq(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producter)).start(); } private class ProductorMq implements Runnable{ Producter producter; public ProductorMq(Producter producter){ this.producter = producter; } @Override public void run() { while(true){ try { producter.sendMessage("zq-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
以上僅僅是一個簡單的demo,實際生產環境種使用,需要考慮使用場景,配置activemq的配置文件,以及與項目的整合問題。tcp