《ActiveMQ 持久化(文件),查詢隊列剩餘消息數、出隊數的實現》分析了消息隊列持久化保存,假如activemq服務器忽然中止,服務器啓動後,還能夠繼續查找隊列中的消息。如今分析隊列中的消息使用數據庫持久化。java
本人博客開始遷移,博客整個架構本身搭建及編碼http://www.cookqq.com/mysql
消息生產者:sql
package com.activemq.mysql; import java.io.File; import java.util.Properties; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.sql.DataSource; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter; import org.apache.commons.dbcp.BasicDataSourceFactory; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /** * 消息持久化到數據庫 * */ public class MessageProductor { private static Logger logger=LogManager.getLogger(MessageProductor.class); private String username=ActiveMQConnectionFactory.DEFAULT_USER; private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD; private String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; public static String queueName="acticemq_queue"; private BrokerService brokerService; protected static final int messagesExpected = 3; protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( username,password, "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); /*** * 建立Broker服務對象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); JDBCPersistenceAdapter jdbc=createJDBCPersistenceAdapter(); broker.setPersistenceAdapter(jdbc); jdbc.setDataDirectory(System.getProperty("user")+ File.separator+"data"+File.separator); jdbc.setAdapter(new MySqlJDBCAdapter()); broker.setPersistent(true); broker.addConnector("tcp://localhost:61616"); //broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); return broker; } /** * 建立Broken的持久化適配器 * @return * @throws Exception */ public JDBCPersistenceAdapter createJDBCPersistenceAdapter() throws Exception{ JDBCPersistenceAdapter jdbc=new JDBCPersistenceAdapter(); DataSource datasource=createDataSource(); jdbc.setDataSource(datasource); jdbc.setUseDatabaseLock(false); //jdbc.deleteAllMessages(); return jdbc; } /** * 建立數據源 * @return * @throws Exception */ public DataSource createDataSource() throws Exception{ Properties props=new Properties(); props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://localhost:3306/activemq"); props.put("username", "root"); props.put("password", "16ds"); DataSource datasource=BasicDataSourceFactory.createDataSource(props); return datasource; } /** * 啓動BrokerService進程 * @throws Exception */ public void init() throws Exception{ createBrokerService(); start(); } public void start() throws Exception{ if(brokerService!=null){ brokerService.start(); } } public BrokerService createBrokerService() throws Exception{ if(brokerService==null){ brokerService=createBroker(); } return brokerService; } public void sendMessage() throws JMSException{ Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(int i=0;i<messagesExpected;i++){ logger.debug("Sending message " + (i+1) + " of " + messagesExpected); producer.send(session.createTextMessage("test message " + (i+1))); } connection.close(); } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } }
消息消費者:數據庫
package com.activemq.mysql; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; /*** * 消息持久化到數據庫 */ public class MessageCustomer { private static Logger logger=LogManager.getLogger(MessageProductor.class); protected static final int messagesExpected = 5; /*** * 建立Broker服務對象 * @return * @throws Exception */ public BrokerService createBroker()throws Exception{ BrokerService broker=new BrokerService(); broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL); return broker; } /** * 啓動BrokerService進程 * @throws Exception */ public void init() throws Exception{ BrokerService brokerService=createBroker(); brokerService.start(); } /** * 接收的信息 * @return * @throws Exception */ public int receiveMessage() throws Exception{ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected); Connection connection=connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(true, Session.SESSION_TRANSACTED); return receiveMessages(messagesExpected,session); } /** * 接受信息的方法 * @param messagesExpected * @param session * @return * @throws Exception */ protected int receiveMessages(int messagesExpected, Session session) throws Exception { int messagesReceived = 0; for (int i=0; i<messagesExpected; i++) { Destination destination = session.createQueue(MessageProductor.queueName); MessageConsumer consumer = session.createConsumer(destination); Message message = null; try { logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected); message = consumer.receive(2000); logger.info("Received : " + message); System.out.println("Received : " + message); if (message != null) { session.commit(); messagesReceived++; } } catch (Exception e) { logger.debug("Caught exception " + e); session.rollback(); } finally { if (consumer != null) { consumer.close(); } } } return messagesReceived; } }
生產者測試類:apache
package com.activemq.mysql; public class MessageProductorTest { public static void main(String[] args) throws Exception { MessageProductor productor =new MessageProductor(); productor.init(); productor.sendMessage(); //productor.createBrokerService().stop(); } }
消費者測試類:服務器
package com.activemq.mysql; public class MessageCustomerTest { public static void main(String[] args) throws Exception { MessageCustomer customer=new MessageCustomer(); //customer.init(); //當兩臺機器在不一樣的服務器上啓動客戶端的broker進程 customer.receiveMessage(); } }
數據庫形式:session
activemq_acks:ActiveMQ的簽收信息。架構
activemq_lock:ActiveMQ的鎖信息。tcp
activemq_msgs:ActiveMQ的消息的信息測試
參照博客: http://topmanopensource.iteye.com/blog/1066383