3、activemq 持久化|8月更文挑戰

這是我參與8月更文挑戰的第3天,活動詳情查看:8月更文挑戰」 ​java

這裏用mysql進行持久化,其餘的都差很少mysql

第一步:將MySQL的數據庫驅動複製到activeMQ的lib目錄下 (須要jar包找我)

第二步:在${activemq.base}/conf/activemq.xml文件中配置持久化適配器

createTablesOnStartup="false" 建立表sql

useDatabaseLock="false" 上鎖(問題2)數據庫

<persistenceAdapter>

<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>

</persistenceAdapter>
複製代碼

第三步:在${activemq.base}/conf/activemq.xml文件中配置數據源

<bean id="derby-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">

<property name="driverClassName" value="com.mysql.jdbc.Driver"/>

<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

<property name="username" value="root"/>

<property name="password" value="root"/>

<property name="poolPreparedStatements" value="true"/>

</bean>
複製代碼

配置會出現的問題:apache

1. Caused by: org.xml.sax.SAXParseException; lineNumber: 92; columnNumber: 92; cvc-complex-type.2.4.a: 發現了以 元素 'bean' 開頭的無效內容。應以 '{"activemq.apache.org/schema/core…' 之一開頭。

解決方案:markdown

把bean放到 bean 標籤那塊session

2. Failed to acquire lock. Sleeping for 10000 milli(s) before trying again...

解決方案:app

useDatabaseLock="false" 配置oop

<persistenceAdapter>

<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds" useDatabaseLock="false" />

</persistenceAdapter>
複製代碼

3.ActiveMQ添加了mysql的持久化後,發了消息,可是MSGS表中沒有記錄

 1.持久化之後 activemq數據庫 會建立3張表post

在 producer 生產者 中須要設置

producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            發送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);
複製代碼

在consumer 消費者中 添加 注意 設置客戶端id 須要在開啓連接 前面

//設置客戶端id

//設置客戶端id
connection.setClientID("client-1");

connection.start();

// final MessageConsumer messageConsumer = session.createConsumer(topic);//普通訂閱
MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"bb"); //持久訂閱
複製代碼

代碼:

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 生產者
 */
public class TopicProducer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void send(String message) {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);


            //建立隊列
            final Topic topic = session.createTopic("topic");
            final MessageProducer producer = session.createProducer(topic);

            TextMessage textMessage = session.createTextMessage(message);

            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//            producer.setTimeToLive(10);

//            發送
            producer.send(textMessage);
//            producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);

//            session.commit();

            producer.close();

            session.close();

            connection.close();

        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicProducer producer = new TopicProducer();
        producer.send("hello world");

    }


}
複製代碼

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author Yang
 * 描述: 消費者
 */


public class TopicConsumer {

    /**
     * 用戶名
     */
    private static final String userName = ActiveMQConnection.DEFAULT_USER;
    /**
     * 密碼
     */
    private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * url
     */
    private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;

    public void receive() {


        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);

            final Connection connection = connectionFactory.createConnection();
             //設置客戶端id
            connection.setClientID("client-1");

            connection.start();

            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

            //建立隊列
            final Topic topic = session.createTopic("topic");

//            final MessageConsumer messageConsumer = session.createConsumer(topic);//普通訂閱
            MessageConsumer messageConsumer = session.createDurableSubscriber(topic,"bb"); //持久訂閱

            messageConsumer.setMessageListener(n -> {

                try {
                    TextMessage msg = (TextMessage) n;

                    final String text = msg.getText();

                    if (text.equalsIgnoreCase("hello world")) {
                        System.out.println("               接受信息:         " + msg.getText());
                    } else {
                        System.out.println("     測試重發次數 ");
                        int i = 1 / 0;
                    }


                } catch (JMSException e) {
//                    e.printStackTrace();
                }

            });

        } catch (JMSException e) {
//            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        TopicConsumer consumer = new TopicConsumer();
        consumer.receive();
    }


}
複製代碼
相關文章
相關標籤/搜索