初次發博文,勿噴~~java
最近老大讓我使用ActiveMQ實現這麼個東東:1.查詢消息隊列中還有多少任務沒有執行;2.消息隊列的持久化;服務器
真是愁殺我也,之前沒見過啊,因而又看文檔,又百度又google的,最終仍是在一天半以後整出來鳥~~session
首先向你們介紹一本書籍《ActiveMQ in Action》,我大部分代碼都是參考這本書實現的。好了,廢話少說,看代碼:tcp
1.首先啓動activeMQ的服務ide
public class RunServer { /** 啓動activeMQ服務 */ public static void main(String[] args) throws Exception { RunServer rs = new RunServer(); BrokerService broker = rs.startServer(); } public BrokerService startServer() throws Exception{ // java代碼調用activemq相關的類來構造並啓動brokerService BrokerService broker = new BrokerService(); // 如下是持久化的配置 // 持久化文件存儲位置 File dataFilterDir = new File("targer/amq-in-action/kahadb"); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFilterDir); // use a bigger journal file kaha.setJournalMaxFileLength(1024*100); // small batch means more frequent and smaller writes kaha.setIndexWriteBatchSize(100); // do the index write in a separate thread kaha.setEnableIndexWriteAsync(true); broker.setPersistenceAdapter(kaha); // create a transport connector broker.addConnector("tcp://localhost:61616"); broker.setUseJmx(true); //broker.setDataDirectory("data/"); // 如下是ManagementContext的配置,從這個容器中能夠取得消息隊列中未執行的消息數、消費者數、出隊數等等 // 設置ManagementContext ManagementContext context = broker.getManagementContext(); context.setConnectorPort(2011); context.setJmxDomainName("my-broker"); context.setConnectorPath("/jmxrmi"); broker.start(); System.in.read(); return broker; }
2.發送消息google
public class Sender { private static final int SEND_NUMBER = 1; public static void main(String[] args) { // ConnectionFactory :鏈接工廠,JMS 用它建立鏈接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // MessageProducer:消息發送者 MessageProducer producer; // TextMessage message; // 構造ConnectionFactory實例對象,此處採用ActiveMq的實現jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("test-persistence"); // 獲得消息生成者【發送者】 producer = session.createProducer(destination); // 設置不持久化,能夠更改 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 構造消息 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session .createTextMessage("ActiveMq 發送的消息" + i); // 發送消息到目的地方 System.out.println("發送消息:" + i); producer.send(message); } }
3.收消息url
public class Receiver { public static void main(String[] args) { // ConnectionFactory :鏈接工廠,JMS 用它建立鏈接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的鏈接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { // 構造從工廠獲得鏈接對象 connection = connectionFactory.createConnection(); // 啓動 connection.start(); // 獲取操做鏈接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //test-queue跟sender的保持一致,一個建立一個來接收 destination = session.createQueue("test-persistence"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message arg0) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("=================="); try { System.out.println("RECEIVE1第一個得到者:" + ((TextMessage) arg0).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
4.獲取消息的狀態,也就是上面所說的得到消息隊列中未執行的消息數、消費者數、出隊數等等線程
public class StateTest { /** * 獲取狀態 * @throws Exception */ public static void main(String[] args) throws Exception { JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi"); JMXConnector connector = JMXConnectorFactory.connect(url, null); connector.connect(); MBeanServerConnection connection = connector.getMBeanServerConnection(); // 須要注意的是,這裏的my-broker必須和上面配置的名稱相同 ObjectName name = new ObjectName("my-broker:BrokerName=localhost,Type=Broker"); BrokerViewMBean mBean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true); // System.out.println(mBean.getBrokerName()); for(ObjectName queueName : mBean.getQueues()) { QueueViewMBean queueMBean = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.class, true); System.out.println("\n------------------------------\n"); // 消息隊列名稱 System.out.println("States for queue --- " + queueMBean.getName()); // 隊列中剩餘的消息數 System.out.println("Size --- " + queueMBean.getQueueSize()); // 消費者數 System.out.println("Number of consumers --- " + queueMBean.getConsumerCount()); // 出隊數 System.out.println("Number of dequeue ---" + queueMBean.getDequeueCount() ); } } }
到此結束,但願能夠爲你們作個參考~~code