消息持久化和非持久化,指的是傳輸模式DeliverModelmysql
持久化和非持久化的最大區別是:持久化傳輸,消息會被保存,即存儲傳輸,而採用非持久化,消息不會被存儲sql
場景問題:服務器斷電重啓,未被消費的消息是否會在重啓以後被繼續消費?apache
非持久性模式: 服務器斷電(關閉)以後,使用非持久性模式時,沒有被消費的消息不會繼續消費所有丟失;程序會報一個鏈接關閉異常中止運行,繼續啓動服務器運行程序,不會接收任何消息。服務器
持久性模式: 服務器斷電(關閉)後,使用持久性模式時,沒有被消費的消息會繼續消費;程序也會報鏈接關閉異常,但再次啓動服務器和程序後,接收方還能繼續原來的消息再次接收。session
非持久訂閱tcp
場景:大學上課,有的人去上課了,有的人逃課了,去上課的人就知道老師講了什麼,沒有去的就不知道ide
非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider 保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到url
持久訂閱spa
場景:相似於qq發送消息,當你不在線的時候,好友給你發送消息,再次登陸,仍然能夠收到線程
持久訂閱時,客戶端向JMS 服務器註冊一個本身身份的ID,當這個客戶端處於離線時,JMS Provider 會爲這個ID 保存全部發送到主題的消息,當客戶再次鏈接到JMS Provider時,會根據本身的ID獲得全部當本身處於離線時發送到主題的消息
activemq日誌文件地址 data/activemq.log
activemq配置文件地址 conf/activemq.xml
修改持久化方式爲jdbc
<persistenceAdapter>
<!--activemq默認持久化方式-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!--JDBC持久化方式 注意createTablesOnStartup,第一次是true,標識啓動建立表,之後都改成false-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
配置jdbc鏈接
若啓動的時候報錯,注意去日誌文件中查看錯誤,多半是沒有引入JDBC的驅動包
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/liuhuxiang?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="admin"/> <property name="poolPreparedStatements" value="true"/> </bean>
啓動成功後,查看db,會多出三張表
activemq_acks 應答ack表
activemq_msgs用來存儲消息,發送者發送消息,在表中會有記錄
activemq_lock 集羣環境保存發送機器
注意,一旦消息被消費了,記錄就會被刪除,即activemq_acks / activemq_msgs都不會有數據
代碼部分
監聽器
/** * 消息監聽 * 通常在消費者中,咱們不直接用recive來消費消息,都是經過監聽器的方式來消費 * 比較JmsptpConsumer 和JmsptpConsumer2的消費方式 */ public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("Listener收到消息,ptp類型"+((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
消費者
/** * 點對點消息消費者 * 經過監聽方式消費 * */ public class JmsptpConsumerByListener { private static final int SENDNUM = 10; public static void main(String[] args) { // 定義成員變量 ConnectionFactory connectionFactory; //鏈接工廠 Connection connect = null; // 鏈接 Session session; // 會話,接受或發送消息的線程 MessageConsumer messageConsumer; // 消息生產者 Destination destination; try { //1 建立鏈接工廠,並由鏈接工廠建立鏈接 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connect = connectionFactory.createConnection();//經過鏈接工廠,建立鏈接 connect.start(); //2 接受者,不用事物 session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE); // 這裏也能夠用Destination來接收,Destination是Queue的父類 destination = session.createQueue("testQueun1"); messageConsumer = session.createConsumer(destination); //註冊消息監聽 經過監聽器的方式監聽消息 messageConsumer.setMessageListener(new Listener()); } catch (JMSException e) { e.printStackTrace(); } finally { if (connect == null) { try { connect.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
生產者
/** *JMS 點對點消息生產者 * 一對一 * */ public class JmsptpProduct { //默認的username password brokerurl private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL; // 發送次數 private static final int SENDNUM = 10; public static void main(String[] args) { // 定義成員變量 ConnectionFactory connectionFactory; //鏈接工廠 Connection connect = null; // 鏈接 Session session; // 會話,接受或發送消息的線程 MessageProducer messageProducer; // 消息生產者 Destination destination; // 消息目的地, 點對點是queue 訂閱是topic 都是Destination的子類,爲了通用,最好使用Destination try { //1 建立鏈接工廠,並由鏈接工廠建立鏈接,有多種構造器 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connect = connectionFactory.createConnection();//經過鏈接工廠,建立鏈接 connect.start(); //2 建立鏈接,true表示使用事物 false表示不適用事物 session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE); //3 建立隊列,兩邊保持一致 destination = session.createQueue("testQueun1"); //4 建立生產者 messageProducer = session.createProducer(destination); sendMessage(session, messageProducer); } catch (JMSException e) { e.printStackTrace(); } finally { if (connect == null) { try { connect.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /*** * 發送消息 * * @param session * @param messageProducer */ public static void sendMessage(Session session, MessageProducer messageProducer) { for (int i = 0; i < SENDNUM; i++) { try { TextMessage textMessage = session.createTextMessage("ActivityMQ,ptp類型,發送消息 i=" + i); System.out.println("ActivityMQ,ptp類型,發送消息 i=" + i); messageProducer.send(textMessage); } catch (JMSException e) { e.printStackTrace(); } } } }