JMS學習(三)----- ActiveMQ簡單的HelloWorld實例

源碼下載:http://git.oschina.net/zhengweishan/JMS_Study_Demohtml

開發環境

我使用的是ActiveMQ 5.13.3 Release的Windows版,官網最新版是ActiveMQ 5.13.4 Release,你們能夠自行下載,下載地址java

須要注意的是,開發時候,要將apache-activemq-5.13.3-bin.zip解壓縮后里面的activemq-all-5.13.3.jar包加入到classpath下面,這個包包含了全部jms接口api的實現。git

項目截圖:apache

ActiviteMQ消息有3中形式

JMS 公共 ----------點對點域 ----------發佈/訂閱域api

ConnectionFactory ---------- QueueConnectionFactory ---------- TopicConnectionFactory服務器

Connection ---------- QueueConnection ---------- TopicConnectionsession

Destination ---------- Queue ---------- Topiceclipse

Session ---------- QueueSession ---------- TopicSessionspa

MessageProducer ---------- QueueSender ---------- TopicPublisher.net

MessageConsumer ---------- QueueReceiver ---------- TopicSubscriber

(1)、點對點方式(point-to-point)

點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發送消息,receive接收消息.具體點就是Sender Client發送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行

(2)、發佈/訂閱 方式(publish/subscriber Messaging)

發佈/訂閱方式用於多接收客戶端的方式.做爲發佈訂閱的方式,可能存在多個接收客戶端,而且接收端客戶端與發送客戶端存在時間上的依賴。一個接收端只能接收他建立之後發送客戶端發送的信息。做爲subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。

ActiviteMQ接收和發送消息基本流程

發送消息的基本步驟:

(1)、建立鏈接使用的工廠類JMS ConnectionFactory

(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動

(3)、使用鏈接Connection 創建會話Session

(4)、使用會話Session和管理對象Destination建立消息生產者MessageSender

(5)、使用消息生產者MessageSender發送消息

消息接收者從JMS接受消息的步驟

(1)、建立鏈接使用的工廠類JMS ConnectionFactory

(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動

(3)、使用鏈接Connection 創建會話Session

(4)、使用會話Session和管理對象Destination建立消息接收者MessageReceiver

(5)、使用消息接收者MessageReceiver接受消息,須要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,須要定義onMessage事件方法。

使用JMS方式發送接收消息

package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MQConnectionFactory {
	
	private  static final  String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認鏈接用戶名
    private  static final  String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認鏈接密碼
    private  static final  String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默認鏈接地址
    
    private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);//鏈接工廠
    
    /**
     * 經過鏈接工廠獲取鏈接
     * [@return](http://my.oschina.net/u/556800)
     */
    public static Connection getConnection(){
    	Connection connection = null;
    	try {
			connection = connectionFactory.createConnection();
		} catch (JMSException e) {
			e.printStackTrace();
		}
    	return connection;
    }
}

package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JMSConsumer {
	

    public static void main(String[] args) {
        Connection connection = null;//鏈接
        
        Session session = null;//會話 接受或者發送消息的線程
        
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消費者

        try {
            //經過鏈接工廠獲取鏈接
            connection = MQConnectionFactory.getConnection();
            //啓動鏈接
            connection.start();
            //建立session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一個鏈接HelloWorld的消息隊列
            destination = session.createQueue("HelloWorld");
            //建立消息消費者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText());
                }else {
                    break;
                }
            }
            //提交回話
            session.commit();

        } catch (JMSException e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session !=null){
            	try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }

    }
}


package com.active.mq.demo;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JMSProducer {

    //發送的消息數量
    private static final int SENDNUM = 10;
    
	public static void main(String[] args) {

        //鏈接
        Connection connection = null;
        //會話 接受或者發送消息的線程
        Session session = null;
        //消息的目的地
        Destination destination;
        //消息生產者
        MessageProducer messageProducer;
       

        try {
            //經過鏈接工廠獲取鏈接
            connection = MQConnectionFactory.getConnection();
            //啓動鏈接
            connection.start();
            //建立session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //建立一個名稱爲HelloWorld的消息隊列
            destination = session.createQueue("HelloWorld");
            //建立消息生產者
            messageProducer = session.createProducer(destination);
            //發送消息
            sendMessage(session, messageProducer);
            //提交回話
            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session !=null){
            	try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }
	}
	
	/**
     * 發送消息
     * [@param](http://my.oschina.net/u/2303379) session
     * [@param](http://my.oschina.net/u/2303379) messageProducer  消息生產者
     * [@throws](http://my.oschina.net/throws) Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            //建立一條文本消息 
            TextMessage message = session.createTextMessage("發送JMS消息第" + (i + 1) + "條");
            System.out.println("發送消息:Activemq 發送JMS消息" + (i + 1));
            //經過消息生產者發出消息 
            messageProducer.send(message);
        }

    }
}

Queue隊列方式發送點對點消息數據

在獲取工廠類中加入以下代碼:

private static QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

/**
 * 經過鏈接工廠獲取鏈接(Queue方式)
 * [@return](http://my.oschina.net/u/556800)
 */
public static QueueConnection getQueueConnection(){
	QueueConnection connection = null;
	try {
		connection = queueConnectionFactory.createQueueConnection();
	} catch (JMSException e) {
		e.printStackTrace();
	}
	return connection;
}


//消息生產者
package com.active.mq.demo;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

public class QueueProducer {
	 private static final int SEND_NUM = 10;


	public static void main(String[] args) {
		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		try {
			// 經過工廠建立一個鏈接
			queueConnection = MQConnectionFactory.getQueueConnection();
            // 啓動鏈接
			queueConnection.start();
            // 建立一個session會話
			queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 建立一個消息隊列
            Queue queue = queueSession.createQueue("QueueMsgDemo");
            // 建立消息發送者
            QueueSender sender = queueSession.createSender(queue);
            // 設置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(queueSession, sender);
            // 提交會話
            queueSession.commit();
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
            // 關閉釋放資源
            if (queueSession != null) {
            	try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
            if (queueConnection != null) {
            	try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
            }
        }
	}
	 
	 
	 public static void sendMessage(QueueSession session, QueueSender sender) throws Exception {
	        for (int i = 0; i < SEND_NUM; i++) {
	            String message = "發送queue消息第" + (i + 1) + "條";
	            //建立一個Map集合信息
	            MapMessage map = session.createMapMessage();
	            map.setString("text", message);
	            map.setLong("time", System.currentTimeMillis());
	            System.out.println("ActiveMQ 發送queue消息:"+(i + 1));
	            sender.send(map);
	        }
	    }
}

//消費者
package com.active.mq.demo;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

public class QueueConsumer {

	public static void main(String[] args) {
		QueueConnection queueConnection = null;
		QueueSession queueSession = null;
		try {
			// 經過工廠建立一個鏈接
			queueConnection = MQConnectionFactory.getQueueConnection();
			// 啓動鏈接
			queueConnection.start();
			// 建立一個session會話
			queueSession = queueConnection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
			// 建立一個消息隊列
			Queue queue = queueSession.createQueue("QueueMsgDemo");
			// 建立消息接收者
			QueueReceiver receiver = queueSession.createReceiver(queue);

			receiver.setMessageListener(new MessageListener() {
				public void onMessage(Message msg) {
					if (msg != null) {
						MapMessage map = (MapMessage) msg;
						try {
							System.out.println(map.getLong("time") + "接收到消息#" + map.getString("text"));
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				}
			});
			// 休眠100ms再關閉
			Thread.sleep(1000 * 100);

			// 提交會話
			queueSession.commit();

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// 關閉釋放資源
			if (queueSession != null) {
				try {
					queueSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (queueConnection != null) {
				try {
					queueConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

Topic主題發佈和訂閱消息

在獲取工廠類中加入以下代碼:

private static TopicConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

/**
 * 經過鏈接工廠獲取鏈接(Topic方式)
 * @return
 */
public static TopicConnection getTopicConnection(){
	TopicConnection topicConnection = null;
	try {
		topicConnection = topicConnectionFactory.createTopicConnection();
	} catch (JMSException e) {
		e.printStackTrace();
	}
	return topicConnection;
}

//生產者
package com.active.mq.demo;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

public class TopicProducer {
	private static final int SEND_NUM = 10;

	public static void main(String[] args) {
		  TopicConnection connection = null;
	      TopicSession session = null;
	        try {
	            // 經過工廠建立一個鏈接
	            connection = MQConnectionFactory.getTopicConnection();
	            // 啓動鏈接
	            connection.start();
	            // 建立一個session會話
	            session = connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
	            // 建立一個消息隊列
	            Topic topic = session.createTopic("TopicDemo");
	            // 建立消息發送者
	            TopicPublisher publisher = session.createPublisher(topic);
	            // 設置持久化模式
	            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	            sendMessage(session, publisher);
	            // 提交會話
	            session.commit();
	            
	        } catch (Exception e) {
	            e.printStackTrace();
	        } finally {
	            // 關閉釋放資源
	            if (session != null) {
	                try {
						session.close();
					} catch (JMSException e) {
						e.printStackTrace();
					}
	            }
	            if (connection != null) {
	                try {
						connection.close();
					} catch (JMSException e) {
						e.printStackTrace();
					}
	            }
	        }
	}
	
	public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "發送Topic消息第" + (i + 1) + "條";
            
            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println("ActiveMQ 發送Topic消息:"+(i + 1));
            publisher.send(map);
        }
    }
}

//消費者
package com.active.mq.demo;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

public class TopicConsumer {
	
	public static void main(String[] args) {
		TopicConnection connection = null;
	    TopicSession session = null;
	    try {
	        // 經過工廠建立一個鏈接
	        connection = MQConnectionFactory.getTopicConnection();
	        // 啓動鏈接
	        connection.start();
	        // 建立一個session會話
	        session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
	        // 建立一個消息隊列
	        Topic topic = session.createTopic("TopicDemo");
	        // 建立消息消費者
	        TopicSubscriber subscriber = session.createSubscriber(topic);
	        
	        subscriber.setMessageListener(new MessageListener() { 
	            public void onMessage(Message msg) { 
	                if (msg != null) {
	                    MapMessage map = (MapMessage) msg;
	                    try {
	                        System.out.println(map.getLong("time") + "Topic接收消息#" + map.getString("text"));
	                    } catch (JMSException e) {
	                        e.printStackTrace();
	                    }
	                }
	            } 
	        }); 
	        // 休眠100ms再關閉
	        Thread.sleep(1000 * 100); 
	        // 提交會話
	        session.commit();
	        
	    } catch (Exception e) {
	        e.printStackTrace();
	    } finally {
	        // 關閉釋放資源
	        if (session != null) {
	            try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
	        }
	        if (connection != null) {
	            try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
	        }
	    }
	}
	
}

運行

以使用JMS方式發送接收消息爲例說明 一、首先,啓動ActiveMQ 二、運行發送者,eclipse控制檯輸出,以下圖:

三、查看ActiveMQ服務器,Queues內容以下:

咱們能夠看到建立了一個名稱爲HelloWorld的消息隊列,隊列中有10條消息未被消費,咱們也能夠經過Browse查看是哪些消息,若是這些隊列中的消息,被刪除,消費者則沒法消費。

四、運行一下消費者,eclipse控制檯打印消息,以下:

五、咱們在查看一下ActiveMQ服務器,Queues內容以下:

咱們能夠看到HelloWorld的消息隊列發生變化,多一個消息者,隊列中的10條消息被消費了,點擊Browse查看,已經爲空了。 點擊Active Consumers,咱們能夠看到這個消費者的詳細信息。

實例到此就結束了,你們能夠本身多看點ActiveMQ服務器的內容,進一步熟悉ActiveMQ。

相關文章
相關標籤/搜索