①.消息傳遞方式介紹:html
Activemq支持兩種方式的消息傳遞:java
廣播模式:1-n的方式,是一種發佈訂閱模式,像騰訊新聞那樣,只要咱們微信關注了騰訊新聞,那麼每一個人都會收到推送的新聞數據庫
隊列模式:1-1的方式,只能有一個消費者端消費生產者生產的數據apache
②.消息類型介紹:windows
Activemq提供了兩種消息類型:持久化和非持久化:微信
消息生產者使用持久(persistent)傳遞模式發送消息的時候,Producer.send() 方法會被阻塞,直到 broker 發送一個確認消息給生產者(ProducerAck),這個確認消息暗示broker已經成功接收到消息並把消息保存到二級存儲中。這個過程一般稱爲同步發送。速度較慢,數據基本不會丟失.能夠持久化到kahaDB(aMq默認採用kahaDB存儲引擎來存儲消息)或數據庫中session
異步發送不會在受到 broker 的確認以前一直阻塞 Producer.send 方法,速度較快,不過可能會形成數據的丟失.異步
消息簽收方式:socket
AUTO_ACKNOWLEDGE 自動確認tcp
CLIENT_ACKNOWLEDGE 客戶端手動確認
DUPS_OK_ACKNOWLEDGE 自動批量確認
SESSION_TRANSACTED 事務提交併確認
③.下載能夠到apache activeMQ官網下載
④.我這裏下載的是windows 5.10版本的就以此爲例作介紹
解壓以後進入bin目錄根據操做系統找到對應的,activemq.bat文件雙擊運行
activeMQ內置有一個控制檯能夠訪問http://localhost:8161/查看,默認帳戶密碼皆爲admin
⑤.客戶端代碼(java爲例):
public class ActiveMqBo { // 建立鏈接工廠 private ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMqConstant.URL); /** * 獲取一個鏈接 * @return * @throws JMSException */ public Connection getConnection() throws JMSException { Connection conn; try { conn = factory.createConnection(); } catch (JMSException e) { e.printStackTrace(); throw e; } return conn; } }public static void main(String[] args) { ActiveMqBo mq = new ActiveMqBo(); Connection conn = null; Session session = null; MessageProducer producer = null; try { // 獲取一個鏈接 conn = mq.getConnection(); conn.start(); // 簽收方式 session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立一個隊列 Destination destination = session.createQueue("queue-1"); // 獲取一個生產者 producer = session.createProducer(destination); /* * 持久化,會經過kahadb把消息存入到db.log中,直到被消費後進行清除 * 速度較慢 * DeliveryMode.NON_PERSISTENT非持久化 * DeliveryMode.PERSISTENT持久化 */ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // producer.setTimeToLive(5000);//5秒後過時,這個對點對點模式有效 for (int i = 0; i < 1000; i++) { MessageDto mes = new MessageDto(); mes.setCode("" + (i + 1)); mes.setMessage("send mes:" + (i + 1)); producer.send(session.createObjectMessage(mes)); System.out.println("send mes : " + mes); } } catch (Exception e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException e) { e.printStackTrace(); } } }日誌部分:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/repository/org/apache/activemq/activemq-all/5.10.0/activemq-all-5.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder] 11:27:42.251 [main] DEBUG o.a.a.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.252 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.257 [main] DEBUG o.a.a.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=127.0.0.1, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 11:27:42.260 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 11:27:42.261 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.a.transport.WireFormatNegotiator - tcp:///127.0.0.1:61616@54559 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} send mes : MessageDto [code=1, message=send mes:1] send mes : MessageDto [code=2, message=send mes:2] send mes : MessageDto [code=3, message=send mes:3] send mes : MessageDto [code=4, message=send mes:4] send mes : MessageDto [code=5, message=send mes:5] send mes : MessageDto [code=6, message=send mes:6] send mes : MessageDto [code=7, message=send mes:7] send mes : MessageDto [code=8, message=send mes:8] ........省略 send mes : MessageDto [code=996, message=send mes:996] send mes : MessageDto [code=997, message=send mes:997] send mes : MessageDto [code=998, message=send mes:998] send mes : MessageDto [code=999, message=send mes:999] send mes : MessageDto [code=1000, message=send mes:1000] 11:27:42.490 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2e0f6c25[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 11:27:42.490 [ActiveMQ Transport: tcp:///127.0.0.1:61616@54559] DEBUG o.a.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@768508c2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 11:27:42.491 [main] DEBUG o.a.a.transport.tcp.TcpTransport - Stopping transport tcp:///127.0.0.1:61616@54559 11:27:42.492 [main] DEBUG o.a.a.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 11:27:42.493 [ActiveMQ Task-1] DEBUG o.a.a.transport.tcp.TcpTransport - Closed socket Socket[addr=/127.0.0.1,port=61616,localport=54559] 11:27:42.493 [main] DEBUG o.a.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@b30e9f8[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]這裏咱們再看控制檯,會發現queue1有數據信息,由於咱們是非持久化方式發送消息,咱們關掉activemq後在重啓會發現數據丟失
,再以持久化的方式測試:
只須要將這裏設置爲PERSISTENT便可
producer.setDeliveryMode(DeliveryMode.PERSISTENT);咱們再啓動剛纔的代碼會發現發送數據的速度很明顯的下降,可是咱們關閉activemq後再重啓,刷新控制檯數據沒有丟失.
⑥.消費端:
public class Consumer { public static void main(String[] args) { try { MessageUtil.getConsumer("queue-1", Boolean.TRUE); } catch (JMSException e) { e.printStackTrace(); } } }public class MessageUtil { private static ActiveMqBo mq = new ActiveMqBo(); private static Connection conn = null; private static Session session = null; private static void init(){ try { // 獲取一個鏈接 if(conn == null){ conn = mq.getConnection(); } conn.start(); // 自動提交事務 if(session == null){ /*Session.AUTO_ACKNOWLEDGE 消息自動簽收 Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息 頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次,客戶端須要進行消息的重複處理控制。*/ session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } } catch (Exception e) { e.printStackTrace(); } } /** * * @param obj 序列化對象 * @param topic * @param isQueue * @throws JMSException */ public static void sendObjectMessage(Serializable obj,String topic,boolean isQueue) throws JMSException{ init(); MessageProducer producer = getProducer(getDestination(topic, isQueue)); producer.send(session.createObjectMessage(obj)); destroy(producer); } public static void sendTextMessage(String mes,String topic,boolean isQueue) throws JMSException{ init(); MessageProducer producer = getProducer(getDestination(topic, isQueue)); producer.send(session.createTextMessage(mes)); destroy(producer); } private static MessageProducer getProducer(Destination destination) throws JMSException{ MessageProducer producer = session.createProducer(destination); /** PERSISTENT(持久性消息): * 這是ActiveMQ的默認傳送模式,此模式保證這些消息只被傳送一次和成功使用一次。對於這些消息,可靠性是優先考慮的因素。 * 可靠性的另外一個重要方面是確保持久性消息傳送至目標後,消息服務在向消費者傳送它們以前不會丟失這些消息。這意味着在持久性消息傳送至目標時, * 消息服務將其放入持久性數據存儲。若是消息服務因爲某種緣由致使失敗, * 它能夠恢復此消息並將此消息傳送至相應的消費者。雖然這樣增長了消息傳送的開銷,但卻增長了可靠性。 * NON_PERSISTENT(非持久性消息): * 保證這些消息最多被傳送一次。對於這些消息,可靠性並不是主要的考慮因素。 * 此模式並不要求持久性的數據存儲,也不保證消息服務因爲某種緣由致使失敗後消息不會丟失。 * */ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); return producer; } private static Destination getDestination(String topic, boolean isQueue) throws JMSException { Destination destination = null; if (isQueue) { destination = session.createQueue(topic); } else { destination = session.createTopic(topic); } return destination; } public static void getConsumer(String topic, boolean isQueue) throws JMSException{ init(); MessageConsumer consumer = session.createConsumer(getDestination(topic, isQueue)); if(Arrays.asList(ActiveMqConstant.QUEUES).contains(topic) || Arrays.asList(ActiveMqConstant.TOPICS).contains(topic) ){ MessageListener listener = ActiveMqConstant.listeners.get(topic); consumer.setMessageListener(listener); } } private static void destroy(MessageProducer producer) throws JMSException{ if(producer != null){ producer.close(); } if(session!=null){ session.close(); session = null; } if(conn!=null){ conn.close(); conn = null; } } public static void destroy(MessageConsumer consumer) throws JMSException{ if(consumer != null){ consumer.close(); consumer = null; } if(session!=null){ session.close(); session = null; } if(conn!=null){ conn.close(); conn = null; } }public class ActiveMqConstant { public static final String USERNAME = "admin", PASSWORD = "admin"; public static final String URL = "tcp://127.0.0.1:61616"; public static final String[] QUEUES = { "queue-1", "queue2", "queue3" }; public static final String[] TOPICS = { "topic1", "topic2", "topic3" }; public static Map<String, MessageListener> listeners = new LinkedHashMap<String, MessageListener>(); static{ init(); } private static void init(){ //後期能夠從xml中配置獲取 listeners.put("queue-1",new ActiveMQMessageListener()); listeners.put("queue2",new ActiveMQMessageListener2()); listeners.put("queue3",new ActiveMQMessageListener3()); listeners.put("topic1",new ActiveMQMessageListener()); listeners.put("topic2",new ActiveMQMessageListener2 ()); listeners.put("topic3",new ActiveMQMessageListener3()); } private ActiveMqConstant() { } }....省略 receiver:MessageDto [code=402, message=send mes:402] receiver:MessageDto [code=403, message=send mes:403] receiver:MessageDto [code=404, message=send mes:404] ...省略 receiver:MessageDto [code=2085, message=send mes:2085] receiver:MessageDto [code=2086, message=send mes:2086] receiver:MessageDto [code=2087, message=send mes:2087] receiver:MessageDto [code=2088, message=send mes:2088] receiver:MessageDto [code=2089, message=send mes:2089] 13:53:21.896 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:21.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:31.897 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:31.897 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:41.897 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check. 13:53:41.898 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:41.898 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:53:51.899 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:53:51.899 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:01.900 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:01.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:11.899 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - 30002ms elapsed since last read check. 13:54:11.901 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:11.901 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:21.902 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:21.902 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 13:54:31.903 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG o.a.a.t.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 13:54:31.903 [ActiveMQ InactivityMonitor Worker] DEBUG o.a.a.t.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]再次刷新控制檯看界面變化
⑦.瞭解activemq的目錄結構 ,會發現以下幾個文件:
db.data,db.redo,db-1.log
在消息未被消費以前會將數據保存在db-*.log中, 其中activemq默認每超過32m從新生成一個新的日誌文件.
db.data:存儲btree索引 ,BTree索引,保存消息的引用,並按照message ID排序。
db.redo:用來保證MQ broker未乾淨關閉狀況下,用於BTree index的重建。注意:對於非持久化的數據若是未及時消費,當activemq宕機時,保存的db-*.log等信息在下次啓動時所有丟失.
廣播模式再也不介紹,跟隊列方式類似,能夠本身多開幾個consumer看接收到的內容是否一致