介紹基本的JMS概念與開源的JMS框架ActiveMQ應用,內容涵蓋一下幾點:html
一:JMS基本概念java
1. JMS的目標spring
爲企業級的應用提供一種智能的消息系統,JMS定義了一整套的企業級的消息概念與工具,儘量最小化的Java語言概念去構建最大化企業消息應用。統一已經存在的企業級消息系統功能。apache
2. 提供者session
JMS提供者是指那些徹底完成JMS功能與管理功能的JMS消息廠商,理論上JMS提供者完成。架構
JMS消息產品必須是100%的純Java語言實現,能夠運行在跨平臺的架構與操做系統上,當前一些JMS廠商包括IBM,Oracle, JBoss社區 (JBoss Community), Apache 社區(ApacheCommunity)。框架
3. JMS應用程序, 一個完整的JMS應用應該實現如下功能:tcp
二:JMS的消息模式ide
1.點對點的消息模式(Point to Point Messaging)工具
下面的JMS對象在點對點消息模式中是必須的:
a.隊列(Queue) – 一個提供者命名的隊列對象,客戶端將會使用這個命名的隊列對象
b.隊列連接工廠(QueueConnectionFactory) – 客戶端使用隊列連接工廠建立連接隊列
ConnectionQueue來取得與JMS點對點消息提供者的連接。
c. 連接隊列(ConnectionQueue) – 一個活動的連接隊列存在在客戶端與點對點消息提供者之間,客戶用它建立一個或者多個JMS隊列會話(QueueSession)
d. 隊列會話(QueueSession) – 用來建立隊列消息的發送者與接受者(QueueSenderand QueueReceiver)
e.消息發送者(QueueSender 或者MessageProducer)– 發送消息到已經聲明的隊列
f.消息接受者(QueueReceiver或者MessageConsumer) – 接受已經被髮送到指定隊列的消息
2.發佈訂閱模式(publish – subscribe Mode)
a.主題Topic(Destination) – 一個提供者命名的主題對象,客戶端將會使用這個命名的主題對象
b.主題連接工廠(TopciConnectionFactory) – 客戶端使用主題連接工廠建立連接主題
ConnectionTopic來取得與JMS消息Pub/Sub提供者的連接。
c.連接主題(ConnectionTopic) – 一個活動的連接主題存在發佈者與訂閱者之間
d.會話(TopicSession) – 用來建立主題消息的發佈者與訂閱者 (TopicPublisher and TopicSubscribers)
e.消息發送者MessageProducer) – 發送消息到已經聲明的主題
f.消息接受者(MessageConsumer) – 接受已經被髮送到指定主題的消息
三:介紹ActiveMQ
ActiveMQ是apache社區完成的JMS開源消息組件,客戶端支持多種語言調用,包括Java,C++, C#,
Perl, Python等。支持Spring配置集成等。更多信息訪問這裏:
http://activemq.apache.org/index.html
四:基於ActiveMQ的Publish/subscribe模式Demo程序
消息Broker,JMSprovider
import java.net.URI; import java.net.URISyntaxException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * refer to http://activemq.apache.org/jndi-support.html * http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html * @author gloomyfish * */ public class PureJMSProducer { private static final Log LOG = LogFactory.getLog(PureJMSProducer.class); private PureJMSProducer() { } /** * @param args the destination name to send to and optionally, the number of * messages to send */ public static void main(String[] args) { Context jndiContext = null; ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; BrokerService broker = null; final int numMsgs = 10; /* * Create a JNDI API InitialContext object */ try { jndiContext = new InitialContext(); } catch (NamingException e) { LOG.info("Could not create JNDI API context: " + e.toString()); System.exit(1); } // create external TCP broker try { broker = BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616")); broker.start(); } catch (URISyntaxException e) { LOG.info("Could not create broker: " + e.toString()); } catch (Exception e) { LOG.info("Could not create broker: " + e.toString()); } // try { // // } /* * Look up connection factory and destination. */ try { connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory"); destination = (Destination)jndiContext.lookup("MyTopic"); } catch (NamingException e) { LOG.info("JNDI API lookup failed: " + e); System.exit(1); } /* * Create connection. Create session from connection; false means * session is not transacted. Create sender and text message. Send * messages, varying text slightly. Send end-of-messages message. * Finally, close connection. */ try { connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); Thread.sleep(3000); for (int i = 0; i < numMsgs; i++) { message.setText("This is message " + (i + 1)); LOG.info("Sending message: " + message.getText()); producer.send(message); Thread.sleep(3000); } /* * Send a non-text control message indicating end of messages. */ producer.send(session.createMessage()); } catch (JMSException e) { LOG.info("Exception occurred: " + e); } catch (InterruptedException e) { LOG.info("Exception occurred: " + e); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } // stop the TCP broker try { broker.stop(); } catch (Exception e) { LOG.info("stop the broker failed: " + e); } } }
客戶端:
import java.io.IOException; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.naming.InitialContext; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMQClient { public static void main(String[] args) throws IOException { // -- http://dlc.sun.com/pdf//816-5904-10/816-5904-10.pdf try { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://locahost"); Connection connection = factory.createConnection(); connection.start(); // create message topic //Topic topic= new ActiveMQTopic("MyTopic"); InitialContext jndiContext=new InitialContext(); Topic topic=(Topic)jndiContext.lookup("MyTopic"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // register message consumer MessageConsumer comsumer1 = session.createConsumer(topic); comsumer1.setMessageListener(new MessageListener(){ public void onMessage(Message m) { try { System.out.println("Consumer get " + ((TextMessage)m).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); Thread.sleep(30000); session.close(); connection.stop(); } catch(Exception e) { e.printStackTrace(); } } }
項目配置,Jar依賴:
依賴的三個Jar分別爲: