ActiveMq 持久化方案修改 kehadb--jdbc

1 消息持久化和非持久化(默認都是使用持久化的

消息持久化和非持久化,指的是傳輸模式DeliverModelmysql

持久化和非持久化的最大區別是:持久化傳輸,消息會被保存,即存儲傳輸,而採用非持久化,消息不會被存儲sql

場景問題:服務器斷電重啓,未被消費的消息是否會在重啓以後被繼續消費?apache

非持久性模式: 服務器斷電(關閉)以後,使用非持久性模式時,沒有被消費的消息不會繼續消費所有丟失;程序會報一個鏈接關閉異常中止運行,繼續啓動服務器運行程序,不會接收任何消息。服務器

持久性模式: 服務器斷電(關閉)後,使用持久性模式時,沒有被消費的消息會繼續消費;程序也會報鏈接關閉異常,但再次啓動服務器和程序後,接收方還能繼續原來的消息再次接收。session

 

2 持久訂閱和非持久訂閱(只適用於pub/sub模型

非持久訂閱tcp

場景:大學上課,有的人去上課了,有的人逃課了,去上課的人就知道老師講了什麼,沒有去的就不知道ide

非持久訂閱只有當客戶端處於激活狀態,也就是和JMS Provider 保持鏈接狀態才能收到發送到某個主題的消息,而當客戶端處於離線狀態,這個時間段發到主題的消息將會丟失,永遠不會收到url

持久訂閱spa

場景:相似於qq發送消息,當你不在線的時候,好友給你發送消息,再次登陸,仍然能夠收到線程

持久訂閱時,客戶端向JMS 服務器註冊一個本身身份的ID,當這個客戶端處於離線時,JMS Provider 會爲這個ID 保存全部發送到主題的消息,當客戶再次鏈接到JMS Provider時,會根據本身的ID獲得全部當本身處於離線時發送到主題的消息

 

3 持久化方式修改  kehaDB--JDBC

activemq日誌文件地址 data/activemq.log

activemq配置文件地址 conf/activemq.xml

修改持久化方式爲jdbc

<persistenceAdapter>
<!--activemq默認持久化方式-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<!--JDBC持久化方式  注意createTablesOnStartup,第一次是true,標識啓動建立表,之後都改成false-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>

配置jdbc鏈接

若啓動的時候報錯,注意去日誌文件中查看錯誤,多半是沒有引入JDBC的驅動包

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/liuhuxiang?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="admin"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

啓動成功後,查看db,會多出三張表

activemq_acks 應答ack表

activemq_msgs用來存儲消息,發送者發送消息,在表中會有記錄

activemq_lock 集羣環境保存發送機器

注意,一旦消息被消費了,記錄就會被刪除,即activemq_acks / activemq_msgs都不會有數據

代碼部分

監聽器

/**
 * 消息監聽
 * 通常在消費者中,咱們不直接用recive來消費消息,都是經過監聽器的方式來消費
 * 比較JmsptpConsumer 和JmsptpConsumer2的消費方式
 */
public class Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("Listener收到消息,ptp類型"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消費者

/**
 * 點對點消息消費者
 * 經過監聽方式消費
 *
 */
public class JmsptpConsumerByListener {
    private static final int SENDNUM = 10;
    public static void main(String[] args) {
        // 定義成員變量
        ConnectionFactory connectionFactory; //鏈接工廠
        Connection connect = null; // 鏈接
        Session session; // 會話,接受或發送消息的線程
        MessageConsumer messageConsumer; // 消息生產者
        Destination destination;
        try {
            //1 建立鏈接工廠,並由鏈接工廠建立鏈接
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            connect = connectionFactory.createConnection();//經過鏈接工廠,建立鏈接
            connect.start();
            //2 接受者,不用事物
            session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 這裏也能夠用Destination來接收,Destination是Queue的父類
            destination = session.createQueue("testQueun1");
            messageConsumer = session.createConsumer(destination);
            //註冊消息監聽  經過監聽器的方式監聽消息
            messageConsumer.setMessageListener(new Listener());
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connect == null) {
                try {
                    connect.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

生產者

/**
 *JMS 點對點消息生產者
 * 一對一
 *
 */
public class JmsptpProduct {

    //默認的username password brokerurl
    private static final String USERNAME  = ActiveMQConnection.DEFAULT_USER;
    private static final String PASSWORD  = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String BROKERURL = ActiveMQConnection.DEFAULT_BROKER_URL;

    // 發送次數
    private static final int SENDNUM = 10;

    public static void main(String[] args) {

        // 定義成員變量
        ConnectionFactory connectionFactory; //鏈接工廠
        Connection connect = null; // 鏈接
        Session session; // 會話,接受或發送消息的線程
        MessageProducer messageProducer; // 消息生產者
        Destination destination; // 消息目的地, 點對點是queue 訂閱是topic 都是Destination的子類,爲了通用,最好使用Destination

        try {
            //1 建立鏈接工廠,並由鏈接工廠建立鏈接,有多種構造器
            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            connect = connectionFactory.createConnection();//經過鏈接工廠,建立鏈接
            connect.start();
            //2 建立鏈接,true表示使用事物 false表示不適用事物
            session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //3 建立隊列,兩邊保持一致
            destination = session.createQueue("testQueun1");
            //4 建立生產者
            messageProducer = session.createProducer(destination);
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connect == null) {
                try {
                    connect.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    /***
     * 發送消息
     *
     * @param session
     * @param messageProducer
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) {
        for (int i = 0; i < SENDNUM; i++) {
            try {
                TextMessage textMessage = session.createTextMessage("ActivityMQ,ptp類型,發送消息 i=" + i);
                System.out.println("ActivityMQ,ptp類型,發送消息 i=" + i);
                messageProducer.send(textMessage);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}
相關文章
相關標籤/搜索