ActiveMQ:JMS開源框架入門介紹

介紹基本的JMS概念與開源的JMS框架ActiveMQ應用,內容涵蓋一下幾點:html

  1. 基本的JMS概念
  2. JMS的消息模式
  3. 介紹ActiveMQ
  4. 一個基於ActiveMQ的JMS例子程序

一:JMS基本概念java

1. JMS的目標spring

爲企業級的應用提供一種智能的消息系統,JMS定義了一整套的企業級的消息概念與工具,儘量最小化的Java語言概念去構建最大化企業消息應用。統一已經存在的企業級消息系統功能。apache

2. 提供者session

JMS提供者是指那些徹底完成JMS功能與管理功能的JMS消息廠商,理論上JMS提供者完成。架構

JMS消息產品必須是100%的純Java語言實現,能夠運行在跨平臺的架構與操做系統上,當前一些JMS廠商包括IBM,Oracle, JBoss社區 (JBoss Community), Apache 社區(ApacheCommunity)。框架

3. JMS應用程序, 一個完整的JMS應用應該實現如下功能:tcp

  • JMS 客戶端 – Java語言開發的接受與發送消息的程序
  • 非JMS客戶端 – 基於消息系統的本地API實現而不是JMS
  • 消息 – 應用程序用來相互交流信息的載體
  • 被管理對象–預先配置的JMS對象,JMS管理員建立,被客戶端運用。如連接工廠,主題等
  • JMS提供者–完成JMS功能與管理功能的消息系統

二:JMS的消息模式ide

1.點對點的消息模式(Point to Point Messaging)工具


 

下面的JMS對象在點對點消息模式中是必須的:

a.隊列(Queue) – 一個提供者命名的隊列對象,客戶端將會使用這個命名的隊列對象

b.隊列連接工廠(QueueConnectionFactory) – 客戶端使用隊列連接工廠建立連接隊列

ConnectionQueue來取得與JMS點對點消息提供者的連接。

c. 連接隊列(ConnectionQueue) – 一個活動的連接隊列存在在客戶端與點對點消息提供者之間,客戶用它建立一個或者多個JMS隊列會話(QueueSession)

d.     隊列會話(QueueSession) – 用來建立隊列消息的發送者與接受者(QueueSenderand QueueReceiver)

e.消息發送者(QueueSender 或者MessageProducer)– 發送消息到已經聲明的隊列

f.消息接受者(QueueReceiver或者MessageConsumer) – 接受已經被髮送到指定隊列的消息

2.發佈訂閱模式(publish – subscribe Mode)


 

a.主題Topic(Destination) – 一個提供者命名的主題對象,客戶端將會使用這個命名的主題對象

b.主題連接工廠(TopciConnectionFactory) – 客戶端使用主題連接工廠建立連接主題

ConnectionTopic來取得與JMS消息Pub/Sub提供者的連接。

c.連接主題(ConnectionTopic) – 一個活動的連接主題存在發佈者與訂閱者之間

d.會話(TopicSession) – 用來建立主題消息的發佈者與訂閱者 (TopicPublisher  and TopicSubscribers)

e.消息發送者MessageProducer) – 發送消息到已經聲明的主題

f.消息接受者(MessageConsumer) – 接受已經被髮送到指定主題的消息

三:介紹ActiveMQ

ActiveMQ是apache社區完成的JMS開源消息組件,客戶端支持多種語言調用,包括Java,C++, C#,

Perl, Python等。支持Spring配置集成等。更多信息訪問這裏:

http://activemq.apache.org/index.html

四:基於ActiveMQ的Publish/subscribe模式Demo程序

消息Broker,JMSprovider

 

import java.net.URI;  
import java.net.URISyntaxException;  
 
import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.JMSException;  
import javax.jms.MessageProducer;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import javax.naming.Context;  
import javax.naming.InitialContext;  
import javax.naming.NamingException;  
 
import org.apache.activemq.broker.BrokerFactory;  
import org.apache.activemq.broker.BrokerService;  
import org.apache.commons.logging.Log;  
import org.apache.commons.logging.LogFactory;  
 
/**  
 * refer to http://activemq.apache.org/jndi-support.html  
 * http://activemq.apache.org/how-do-i-embed-a-broker-inside-a-connection.html  
 * @author gloomyfish  
 *  
 */ 
public class PureJMSProducer {  
      
    private static final Log LOG = LogFactory.getLog(PureJMSProducer.class);  
 
    private PureJMSProducer() {  
    }  
 
    /**  
     * @param args the destination name to send to and optionally, the number of  
     *                messages to send  
     */ 
    public static void main(String[] args) {  
        Context jndiContext = null;  
        ConnectionFactory connectionFactory = null;  
        Connection connection = null;  
        Session session = null;  
        Destination destination = null;  
        MessageProducer producer = null;  
        BrokerService broker = null;  
        final int numMsgs = 10;  
 
        /*  
         * Create a JNDI API InitialContext object  
         */ 
        try {  
            jndiContext = new InitialContext();  
        } catch (NamingException e) {  
            LOG.info("Could not create JNDI API context: " + e.toString());  
            System.exit(1);  
        }  
          
        // create external TCP broker  
        try {  
            broker = BrokerFactory.createBroker(new URI("broker:tcp://localhost:61616"));  
            broker.start();   
        } catch (URISyntaxException e) {  
            LOG.info("Could not create broker: " + e.toString());  
        } catch (Exception e) {  
            LOG.info("Could not create broker: " + e.toString());  
        }  
//        try {  
//            
//        }  
 
        /*  
         * Look up connection factory and destination.  
         */ 
        try {  
            connectionFactory = (ConnectionFactory)jndiContext.lookup("ConnectionFactory");  
            destination = (Destination)jndiContext.lookup("MyTopic");  
        } catch (NamingException e) {  
            LOG.info("JNDI API lookup failed: " + e);  
            System.exit(1);  
        }  
          
        /*  
         * Create connection. Create session from connection; false means  
         * session is not transacted. Create sender and text message. Send  
         * messages, varying text slightly. Send end-of-messages message.  
         * Finally, close connection.  
         */ 
        try {  
            connection = connectionFactory.createConnection();  
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
            producer = session.createProducer(destination);  
            TextMessage message = session.createTextMessage();  
            Thread.sleep(3000);  
            for (int i = 0; i < numMsgs; i++) {  
                message.setText("This is message " + (i + 1));  
                LOG.info("Sending message: " + message.getText());  
                producer.send(message);  
                Thread.sleep(3000);  
            }  
 
            /*  
             * Send a non-text control message indicating end of messages.  
             */ 
            producer.send(session.createMessage());  
        } catch (JMSException e) {  
            LOG.info("Exception occurred: " + e);  
        } catch (InterruptedException e) {  
            LOG.info("Exception occurred: " + e);  
        } finally {  
            if (connection != null) {  
                try {  
                    connection.close();  
                } catch (JMSException e) {  
                }  
            }  
        }  
          
        // stop the TCP broker  
        try {  
            broker.stop();  
        } catch (Exception e) {  
            LOG.info("stop the broker failed: " + e);  
        }  
    }  
} 

客戶端:

import java.io.IOException;  
 
import javax.jms.Connection;  
import javax.jms.JMSException;  
import javax.jms.Message;  
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;  
import javax.jms.Session;  
import javax.jms.TextMessage;  
import javax.jms.Topic;  
import javax.naming.InitialContext;  
 
import org.apache.activemq.ActiveMQConnectionFactory;  
 
 
public class ActiveMQClient {  
      
    public static void main(String[] args) throws IOException {  
          
        // -- http://dlc.sun.com/pdf//816-5904-10/816-5904-10.pdf  
        try {  
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        // ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://locahost");  
        Connection connection = factory.createConnection();  
        connection.start();  
          
        // create message topic  
        //Topic topic= new ActiveMQTopic("MyTopic");  
        InitialContext jndiContext=new InitialContext();  
        Topic topic=(Topic)jndiContext.lookup("MyTopic");   
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
          
        // register message consumer  
        MessageConsumer comsumer1 = session.createConsumer(topic);  
        comsumer1.setMessageListener(new MessageListener(){  
            public void onMessage(Message m) {  
                try {  
                    System.out.println("Consumer get " + ((TextMessage)m).getText());  
                } catch (JMSException e) {  
                    e.printStackTrace();  
                }   
            }  
        });  
        Thread.sleep(30000);  
        session.close();  
        connection.stop();  
          
        } catch(Exception e) {  
            e.printStackTrace();  
        }  
    }  
} 

項目配置,Jar依賴:

 

依賴的三個Jar分別爲:

  • activemq-all.jar
  • geronimo-jms_1.1_spec-1.1.1.jar
  • xbean-spring.jar

原文連接:http://blog.csdn.net/jia20003/article/details/7601176

相關文章
相關標籤/搜索