本人博客開始遷移,博客整個架構本身搭建及編碼 http://www.cookqq.com/listBlog.action java
《ActiveMQ發消息和收消息》詳細介紹了ActiveMQ發消息和收消息,消息保存在消息隊列(queue)中,消息隊列數據保存在計算機內存中,假如ActiveMQ服務器因爲某些緣由忽然中止,那消息隊列中內容還在嗎?用事實說話吧,把ActiveMQ服務器中止,而後再看看ActiveMQ頁面上的隊列信息queue,如圖:數據庫
activemq_queue隊列中的信息所有丟失了。爲了解決這個問題,能夠把隊列數據持久化,分爲文件持久化和數據庫持久化。服務器
如今詳細分析文件持久化。直接來代碼吧:session
消息發送者:架構
package com.activemq.queue; import javax.jms.Connection; public class Sender { private static final int SEND_NUMBER = 1; public static void main(String[] args) { // ConnectionFactory 接口(鏈接工廠) 用戶用來建立到JMS提供者的鏈接的被管對象。 // JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。 // 管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。 // 根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 // Connection 接口(鏈接) 鏈接表明了應用程序和消息服務器之間的通訊鏈路。 // 在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型, // 鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。 Connection connection = null; // Session: 一個發送或接收消息的線程 // Session 接口(會話) 表示一個單線程的上下文,用於發送和接收消息。 // 因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。 // 會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。 // 在提交事務以前,用戶能夠使用回滾操做取消這些消息。一個會話容許用戶建立消息生產者來發送消息,建立消息消費者來接收消息。 Session session; // Destination :消息的目的地;消息發送給誰. // Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象, // 消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。 // JMS管理員建立這些對象,而後用戶經過JNDI發現它們。 // 和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。 Destination destination; // MessageProducer:消息發送者 // MessageProducer 接口(消息生產者) 由會話建立的對象,用於發送消息到目標。 // 用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory實例對象,此處採用ActiveMq的實現jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, //null ActiveMQConnection.DEFAULT_PASSWORD, //null "tcp://localhost:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //1 destination = session.createQueue(RunServer.queueName); // 獲得消息生成者【發送者】 producer = session.createProducer(destination); // 設置不持久化,能夠更改 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //2 // 構造消息 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq 發送的消息" + i); // 發送消息到目的地方 System.out.println("發送消息:" + i); producer.send(message); } } }
消息接受者:異步
package com.activemq.queue; import javax.jms.Connection; public class Receiver { public static void main(String[] args) { // ConnectionFactory :鏈接工廠,JMS 用它建立鏈接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 Connection connection = null; // Session: 一個發送或接收消息的線程 final Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 // MessageConsumer 接口(消息消費者) 由會話建立的對象,用於接收發送到目標的消息。 // 消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //queue_name跟sender的保持一致,一個建立一個來接收 destination = session.createQueue(RunServer.queueName); consumer = session.createConsumer(destination); // //第一種狀況 // int i = 0; // while (i < 3) { // i++; // TextMessage message = (TextMessage) consumer.receive(); // session.commit(); // // TODO something.... // System.out // .println("收到消息:" +message.getText()); // } // session.close(); // connection.close(); //----------------第一種狀況結束---------------------- // 第二種方式 consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { if(arg0 instanceof TextMessage){ try { System.out.println("arg0="+((TextMessage)arg0).getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } } }); //第三種狀況 // while (true) { // Message msg = consumer.receive(1000); // TextMessage message = (TextMessage) msg; // if (null != message) { // System.out.println("收到消息:" + message.getText()); // } // } } catch (Exception e) { e.printStackTrace(); } } }
程序中啓動activemq服務:tcp
package com.activemq.queue; import java.io.File; public class RunServer { public static String jmxDomain = "jms-broker"; public static int connectorPort = 2011; public static String connectorPath = "/jmxrmi"; public static String queueName = "activemq_queue"; /** 啓動activeMQ服務 */ public static void main(String[] args) throws Exception { // java代碼調用activemq相關的類來構造並啓動brokerService BrokerService broker = new BrokerService(); // 如下是持久化的配置 // 持久化文件存儲位置 File dataFilterDir = new File("activemq/amq-in-action/kahadb"); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFilterDir); // use a bigger journal file kaha.setJournalMaxFileLength(1024*100); // small batch means more frequent and smaller writes kaha.setIndexWriteBatchSize(100); // do the index write in a separate thread kaha.setEnableIndexWriteAsync(true); broker.setPersistenceAdapter(kaha); // create a transport connector broker.addConnector("tcp://localhost:61616"); broker.setUseJmx(true); // 如下是ManagementContext的配置,從這個容器中能夠取得消息隊列中未執行的消息數、消費者數、出隊數等等 // 設置ManagementContext ManagementContext context = broker.getManagementContext(); context.setConnectorPort(connectorPort); context.setJmxDomainName(jmxDomain); context.setConnectorPath(connectorPath); broker.start(); } }
測試隊列queue中各個狀態數:ide
package com.activemq.queue; import javax.management.MBeanServerConnection; public class StateTest { /** * 獲取狀態 * @throws Exception */ public static void main(String[] args) throws Exception { JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:"+ RunServer.connectorPort+RunServer.connectorPath); JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); MBeanServerConnection connection = connector.getMBeanServerConnection(); // 須要注意的是,這裏的jms-broker必須和上面配置的名稱相同 ObjectName name = new ObjectName(RunServer.jmxDomain+":BrokerName=localhost,Type=Broker"); BrokerViewMBean mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true); // System.out.println(mBean.getBrokerName()); for(ObjectName queueName : mBean.getQueues()) { QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler .newProxyInstance(connection, queueName, QueueViewMBean.class, true); System.out.println("\n------------------------------\n"); // 消息隊列名稱 System.out.println("States for queue --- " + queueMBean.getName()); // 隊列中剩餘的消息數 System.out.println("Size --- " + queueMBean.getQueueSize()); // 消費者數 System.out.println("Number of consumers --- " + queueMBean.getConsumerCount()); // 出隊數 System.out.println("Number of dequeue ---" + queueMBean.getDequeueCount() ); } } }