首先咱們應該先了解J2EE中的一個重要規範:JMS(The Java Message Service)Java消息服務。而JMS的客戶端之間能夠經過JMS服務進行異步的消息傳輸。它主要有兩種模型:點對點和發佈訂閱模型。java
點對點的模型特色::linux
發佈訂閱模型特色:spring
JMS還定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。apache
其中咱們用的最多的就是TextMessage字符串對象瀏覽器
消息中間件做爲JMS的實現,在J2EE的企業應用中扮演着特殊的角色。ActiveMQ是一個易於使用的消息中間件。做爲JMS的實現,消息中間件的使用步驟都大同小異,下面咱們以ActiveMQ爲例來介紹一下其使用。服務器
ActiveMQ的安裝:
ActiveMQ安裝很簡單,只需從其官網下載至linux環境,解壓並進入bin目錄session
./activemq start #啓動 ./activemq stop #中止 ./activemq status #查看狀態
使用啓動命令運行就行了。而後咱們打開瀏覽器進入管理後臺http://yourip:8161/admin/
輸入默認的帳號密碼:admin。就能夠看到管控臺了。
而後咱們使用java操做ActiveMQ,mq的使用基本都須要建立鏈接、session、Destination這麼幾步,咱們直接看代碼吧。
點對點的模型:
生產者代碼:app
@Test public void testQueueProducer() throws Exception { //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616"); //二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); //三、開啓鏈接,調用Connection對象的start方法。 connection.start(); //四、建立一個Session對象。 //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用queue Queue queue = session.createQueue("test-queue"); //六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); //七、建立一個Message對象,可使用TextMessage。 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello Activemq");*/ TextMessage textMessage = session.createTextMessage("hello activemq"); //八、發送消息 producer.send(textMessage); //九、關閉資源 producer.close(); session.close(); connection.close(); }
消費者代碼:異步
@Test public void testQueueConsumer() throws Exception { //建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.161:61616"); //建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); //使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個Destination對象。queue對象 Queue queue = session.createQueue("spring-queue"); //使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(queue); //接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待接收消息 System.in.read(); //關閉資源 consumer.close(); session.close(); connection.close(); }
當咱們運行了生產者後,咱們能夠在管控臺看到Queue中多了一條消息,而且尚未被消費
當咱們運行消費者後就能夠接收到生產者發送的消息。
而且管控臺也出現了相應的變化
發佈訂閱的模型:
生產者代碼:tcp
@Test public void testTopicProducer() throws Exception { //一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.72.121:61616"); //二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); //三、開啓鏈接,調用Connection對象的start方法。 connection.start(); //四、建立一個Session對象。 //第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 //第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic Topic topic = session.createTopic("test-topic"); //六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); //七、建立一個Message對象,可使用TextMessage。 /*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello Activemq");*/ TextMessage textMessage = session.createTextMessage("topic message"); //八、發送消息 producer.send(textMessage); //九、關閉資源 producer.close(); session.close(); connection.close(); }
消費者代碼:
@Test public void testTopicConsumer() throws Exception { //建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.72.121:61616"); //建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); //使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個Destination對象。topic對象 Topic topic = session.createTopic("test-topic"); //使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(topic); //接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消費者3啓動。。。。"); //等待接收消息 System.in.read(); //關閉資源 consumer.close(); session.close(); connection.close(); }
發佈訂閱模型具備嚴格的時間相關性,若是沒有訂閱者的話,發佈者發佈的內容就被浪費掉了。
Spring提供了JMSTemplate,極大地便利了MQ的使用,咱們只須要提早在配置文件中配置相關的JMS配置,就可在代碼中直接使用。
Spring配置文件:
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.161:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是隊列(Queue)目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題(Topic)目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="itemAddTopic" /> </bean> <!-- 接收消息配置 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean>
配置好後可在代碼中使用如下方法
生產者代碼:
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一個spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:從容器中得到JMSTemplate對象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:從容器中得到一個Destination對象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate對象發送消息,須要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); }
消費者代碼:
public class MyMessageListener implements MessageListener { //繼承MessageListener接口並從新它的onMessage方法 @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); }
消費者的使用是在Spring容器中注入一個監聽器,因此咱們須要在配置文件中配置它,它隨着Spring的啓動而啓動,而且實時監聽。當有消息向它發送時,他會當即進行邏輯處理。
本文做者: catalinaLi
本文連接: http://catalinali.top/2017/us... 版權聲明: 原創文章,有問題請評論中留言。非商業轉載請註明做者及出處。