ActiveMQ有點對點和發佈訂閱兩種方式,這兩種的消息存儲仍是有稍微一點區別。html
隊列的存儲比較簡單,就是先進先出(FIFO),只有當該消息已被消費和確承認以刪除消息存儲。若是沒有被確認,其餘消費者是不能獲取消息的。
看看下面的例子:
生產者發送了10條消息:java
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; boolean useTransaction = false; try { // 建立一個ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 建立一個Connection connection = connectionFactory.createConnection(); // 啓動消息傳遞的鏈接 connection.start(); // 建立一個session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 建立一個destination,把消息發送到test.queue destination = session.createQueue("test.persistence"); // 建立一個生產者 producer = session.createProducer(destination); // 建立消息 for (int i = 0; i < 10; i++) { Message message = session.createTextMessage("this is test.persistence" + i); // 發送消息 producer.send(message); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消費者1消費5條,可是暫時沒確認mysql
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 建立一個ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 建立一個Connection connection = connectionFactory.createConnection(); // 啓動消息傳遞的鏈接 connection.start(); // 建立一個session session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE); // 建立一個destination,把消息發送到test.queue destination = session.createQueue("test.persistence"); // 建立一個消費者 consumer = session.createConsumer(destination); // 接收消息 for (int i = 0; i < 5; i++) { Message message = consumer.receive(); TimeUnit.SECONDS.sleep(1); System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); if (i == 4) { TimeUnit.SECONDS.sleep(10); message.acknowledge(); } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消費者2此時就阻塞在獲取消息上面,直到消費者1確認後才接收到消息sql
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { // 建立一個ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 建立一個Connection connection = connectionFactory.createConnection(); // 啓動消息傳遞的鏈接 connection.start(); // 建立一個session session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE); // 建立一個destination,把消息發送到test.queue destination = session.createQueue("test.persistence"); // 建立一個消費者 consumer = session.createConsumer(destination); // 接收消息 for (int i = 0; i < 5; i++) { Message message = consumer.receive(); System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); if (i == 4) { message.acknowledge(); } } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
在發佈訂閱中,每一個消費者都會獲取到消息的拷貝,爲了節約空間,broker只存儲了一份消息,並存儲了每一個消費者所消費的信息,這樣每一個消費者雖然有不一樣的消費進度,最終仍是能一次獲取到消息。若是消息被全部訂閱者消費完了,broker就能夠刪除這個消息。
數據庫
ActiveMQ提供了多種存儲方式,好比AMQ、KahaDB、JDBC、內存。apache
參考官網
AMQ是早期版本的默認持久化存儲方式,基於文件的事務存儲,對於消息的存儲進行了調優,速度仍是很是快的。默認大小32M。當消息被成功使用時,就會被標記爲清理或者存檔,這個操做將在下個清理時發送。基本配置以下(其餘參數詳見官網):segmentfault
<broker persistent="true" xmlns="http://activemq.apache.org/schema/core"> ... <persistenceAdapter> <amqPersistenceAdapter/> </persistenceAdapter> ... </broker>
5.4之後默認的持久化存儲方式,也是基於文件的,與AMQ不一樣的是,KahaDB採用了B-Tree存儲的佈局。擁有高性能和可擴展性等特色。基本配置以下:緩存
<broker brokerName="broker" persistent="true" useShutdownHook="false"> ... <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter> ... </broker>
JDBC有三個表,兩個表存儲消息,還有一個表當作鎖用,保證只能一個代理訪問broker。配置以下:
先把默認的kahaDB註釋掉,再用下面的替換,注意這個配置是在broker下面。session
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
而後再增長數據庫配置,注意,這個配置是broker外面,跟其餘bea同級。tcp
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
在啓動activemq.bat,以前,先建數據庫,而且編碼設置爲latin1,否則ACTIVEMQ_ACKS表會建立失敗致使啓動不了。而後根據報錯信息一次在lib下加入commons-dbcp-1.4.jar、commons-pool-1.5.4.jar、mysql-connector-java-5.1.36.jar包。以下所示,幫咱們建了三個表:
activemq_acks用於保存持久化訂閱信息。
activemq_lock,用於broker集羣時的Master選舉。
activemq_msgs,用於存儲消息信息。
代碼參考以前的點對點
演示步驟以下:
一、 啓動生產者,發送消息
二、 查看錶數據以下:
三、 啓動消費者,消息消費後表數據被刪除
代碼參考以前的發佈訂閱
演示步驟以下:
一、 啓動生產者,發送消息。
二、 查看錶數據,並無持久化,因此發佈訂閱默認不持久化的。
三、 啓動消費者,沒有消息被消費。
爲了讓消息能夠被消費者消費,咱們能夠這樣作:
消費者代碼以下
public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; // 這邊的Topic Topic destination; TopicSubscriber consumer = null; Message message; boolean useTransaction = false; try { // 建立一個ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 建立一個Connection connection = connectionFactory.createConnection(); // 設置ClientId connection.setClientID("ZhangSan"); // 啓動消息傳遞的鏈接 connection.start(); // 建立一個session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 建立一個destination,把消息發送到test.queue destination = session.createTopic("test.topic.mysql"); // 建立一個消費者,調用createDurableSubscriber方法 consumer = session.createDurableSubscriber(destination,"my"); // 接收一個消息 while (null != (message = consumer.receive())) { System.out.println("consumer receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { } }
啓動兩個消費者者,activemq_acks表數據以下,已經有了兩個數據
activemq的subscribers界面以下,也有兩個數據,若是咱們消費者下線了,就會到Offline Durable Topic Subscribers列表
修改消費者的代碼後,生產者發送消息的時候,若是消費者在線,就直接消費,若是不在線,上線後還能夠繼續消費,下圖是消費者消費了幾回後,LAST_ACKED_ID變成了4。
發佈訂閱默認不持久化,因此生產者代碼能夠這樣修改,加下面一句話
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
生產者發送消息後,activemq_msgs表數據以下,與點對點不同的是,這個消息被消費後,不會被刪除。
因爲JDBC的性能相對比較差,因此activemq還提供了日誌類型的jdbc,確保了JMS事務的一致性。由於它整合了很是快的信息寫入與緩存技術,它能夠顯着提升性能。配置以下:
<?xml version="1.0" encoding="UTF-8"?> <beans> <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/> </persistenceAdapter> </broker> <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource"> <property name="databaseName" value="derbydb"/> <property name="createDatabase" value="create"/> </bean> </beans>
雖然性能比jdbc快,可是他不支持master/slave。
把消息存儲在內存中,因此沒有持久化的功能,所以要保證內存足夠大,來緩存消息。
<?xml version="1.0" encoding="UTF-8"?> <beans> <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker> </beans>