由於對JMS的瞭解也只算入門級,有些概念也很模糊,不過,滷煮會盡量去介紹的。另外,sample code都調試過能夠跑。java
1.神馬是JMS?服務器
jms即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。——摘自百度百科session
2.JMS組成要素多線程
JMS提供者,JMS客戶,JMS生產者,JMS消費者,JMS隊列和JMS主題。具體啥意思,仍是參考百度百科吧,不過,概念講起來也木有啥意思,還會把人繞暈。多跑幾回sample,回來再看概念可能更清楚些哈。另,博文中的JMS提供者以IBM 的WebSphere MQ爲例。併發
3.JMS模型app
JMS模型定義一組可供 Java™ 應用程序用於執行消息傳遞操做的接口。異步
如下列表歸納了主要的JMS接口:分佈式
Destination:Destination 對象是應用程序將消息發往的位置和/或應用程序從其接收消息的源。ide
ConnectionFactory:ConnectionFactory 對象包括鏈接的一組配置屬性。應用程序使用鏈接工廠來建立鏈接。工具
Connection:Connection 對象包括應用程序與消息傳遞服務器的活動鏈接。應用程序使用鏈接來建立會話。
Session:Session 對象是用於發送和接收消息的單個線程上下文。應用程序使用會話來建立消息、消息生產者和消息使用者。會話是事務性或非事務性會話。
Message:Message 對象包括應用程序發送或接收的消息。
MessageProducer:應用程序使用消息生產者將消息發送到目標。
MessageConsumer:應用程序使用消息使用者來接收已發送到目標的消息。
下圖是這些對象之間的關係(摘自IBM Info Center)其實,IBM info center中資料多多的呀。
Destination、ConnectionFactory 或 Connection 對象可供多線程應用程序的不一樣線程併發使用,可是 Session、MessageProducer 或 MessageConsumer 對象不能供不一樣線程併發使用。確保不併發使用 Session、MessageProducer 或 MessageConsumer 對象的最簡單方法是爲每一個線程建立單獨的 Session 對象。
JMS支持兩種消息傳遞樣式:
這兩類消息傳遞也被稱爲消息傳遞域,且您能夠將兩類消息傳遞都組合在一個應用程序中。在點到點域中,目標是隊列,而在發佈/預訂域中,目標是主題。
經過JMS1.1 之前的JMS版本,對點到點域的程序設計使用一組接口和方法,而對發佈/預訂域的程序設計使用另外一組接口和方法。兩組接口和方法是類似的,但卻各自獨立。經過JMS1.1,您可使用一組公共的支持兩類消息傳遞域的接口和方法。公共接口提供了獨立於域的每一個消息傳遞域的視圖。下表列出了獨立於JMS 域的接口及其相應的特定於域的接口。
特別清楚有木有。
4.開發JMS客戶端用於鏈接WMQ及收發消息
下面的code能跑起來的前提是,本地已安裝MQ,且建立好隊列管理器和相應的QUEUE,TOPIC。
鏈接方式一:使用的是IBM對於JMS的實現(要導入包com.ibm.mqjms.jar):
1 package com.demo; 2 3 import java.io.UnsupportedEncodingException; 4 5 import javax.jms.BytesMessage; 6 import javax.jms.Connection; 7 import javax.jms.DeliveryMode; 8 import javax.jms.ExceptionListener; 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.MessageConsumer; 12 import javax.jms.MessageListener; 13 import javax.jms.MessageProducer; 14 import javax.jms.Queue; 15 import javax.jms.QueueConnection; 16 import javax.jms.QueueConnectionFactory; 17 import javax.jms.Session; 18 import javax.jms.TextMessage; 19 import javax.naming.NamingException; 20 21 import com.ibm.mq.jms.MQQueueConnectionFactory; 22 23 public class JmsQueueDemo { 24 private static Connection conn = null; 25 private static Session session = null; 26 private static MessageProducer producer = null; 27 private static MessageConsumer consumer = null; 28 private static QueueConnection qConn = null; 29 30 public static void init() { 31 // 鏈接工廠,用com.ibm.mq.jms中的類實現javax.jms中的接口 32 QueueConnectionFactory qcf = new MQQueueConnectionFactory(); 33 34 // 設置鏈接工廠屬性 35 try { 36 //設置WMQ所在機器的IP 37 ((MQQueueConnectionFactory) qcf).setHostName("localhost"); 38 //設置WMQ上隊列管理器名 39 ((MQQueueConnectionFactory) qcf).setQueueManager("TestQM"); 40 //設置WMQ上的通道名 41 ((MQQueueConnectionFactory) qcf).setChannel("SYSTEM.DEF.SVRCONN"); 42 //設置WMQ上的監聽端口 43 ((MQQueueConnectionFactory) qcf).setPort(1414); 44 45 //由鏈接工廠建立鏈接 46 qConn = qcf.createQueueConnection(); 47 48 //創建異常監聽器用於監聽鏈接過程當中發生的異常 49 ExceptionListener exceptionListener = new ExceptionListener(){ 50 51 //此處可放入更多邏輯,由本身定義 52 public void onException(JMSException e) { 53 System.out.println("mq exception"); 54 e.printStackTrace(); 55 System.exit(0); 56 } 57 58 }; 59 //在鏈接上面註冊監聽器 60 qConn.setExceptionListener(exceptionListener); 61 } catch (JMSException e) { 62 63 e.printStackTrace(); 64 return; 65 } 66 } 67 68 public static void main(String[] args) throws NamingException, 69 JMSException, UnsupportedEncodingException { 70 71 init(); 72 sendMessage(); 73 // receiveMessage(); 74 receiveWithListener(); 75 destroy(); 76 77 } 78 79 public static void sendMessage() throws JMSException { 80 boolean transacted = false; 81 82 // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)] 83 session = qConn 84 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE); 85 // 由session建立要發送到的隊列 86 Queue inputQ = session.createQueue("TestQ"); 87 88 //由session建立消息發送者 89 MessageProducer sender = session.createProducer(inputQ); 90 91 //啓動鏈接 92 qConn.start(); 93 94 // 消息由會話建立 95 TextMessage message = session.createTextMessage(); 96 //設置消息內容 97 message.setText("this is input message from queue sender"); 98 //這句無關緊要的哦,主要用於設置消息屬性;方便後面取消息時,取特定類型的消息,如"company='systems'" 99 message.setStringProperty("company", "systems"); 100 101 // 發送消息,後面的參數依次爲消息的持久性設置,消息的優先級,消息在隊列的存活時間,設置爲0,表示永不失效 102 //DeliveryMode爲PERSISTENT表示,隊列管理器或者WMQ重啓後,消息仍在queue中;NON_PERSISTENT意思相反 103 sender.send(message, DeliveryMode.NON_PERSISTENT, 7, 0); 104 } 105 106 //使用listener的方式從queue中取消息,可一次取多條消息出來 107 public static void receiveWithListener() throws JMSException { 108 109 boolean transacted = false; 110 111 // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)] 112 session = qConn 113 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE); 114 //從同一個queue中取消息,此處爲使用session建立queue 115 Queue outputQ = session.createQueue("TestQ"); 116 //使用session建立消息消費者,注意了,後面的那個參數就是消息選擇器,用於接收特定類型的消息 117 consumer = session.createConsumer(outputQ, "company='t-systems'"); 118 //建立消息監聽器 119 MessageListener listener = new MessageListener() { 120 public void onMessage(Message message) { 121 try { 122 if (message instanceof TextMessage) { 123 System.out.println("Listener 接收消息:" 124 + ((TextMessage) message).getText()); 125 } 126 } catch (JMSException e) { 127 e.printStackTrace(); 128 } 129 } 130 }; 131 //註冊消息監聽器 132 consumer.setMessageListener(listener); 133 //啓動鏈接 134 qConn.start(); 135 try { 136 Thread.sleep(10 * 1000); 137 } catch (InterruptedException e) { 138 e.printStackTrace(); 139 } 140 } 141 //取消息的另外一種方式,手動從queue中取消息,一次只能接收一條消息 142 public static void receiveMessage() throws JMSException, 143 UnsupportedEncodingException { 144 145 boolean transacted = false; 146 147 // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)] 148 session = qConn 149 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE); 150 // 對隊列管理器上隊列的映射 151 Queue outputQ = session.createQueue("TestQ"); 152 consumer = session.createConsumer(outputQ); 153 qConn.start(); 154 //此時,若send的message中,設置了message的屬性如,"company='systems'",下面的方法是取不到消息的哈 155 //要取到消息可屏蔽到send中設置message屬性的語句,或使用consumer.receive("company='systems'") 156 Message msg = consumer.receiveNoWait(); 157 //轉換消息格式 158 if (msg instanceof TextMessage) { 159 TextMessage message = (TextMessage) msg; 160 System.out.println("received message from queue is:" 161 + message.getText()); 162 } else if (msg instanceof BytesMessage) { 163 BytesMessage message = (BytesMessage) msg; 164 byte buff[] = null; 165 long length = message.getBodyLength(); 166 buff = new byte[(int) length]; 167 message.readBytes(buff); 168 String textmessage = new String(buff, "UTF-8"); 169 System.out.println("received message from queue is:" + textmessage); 170 } 171 } 172 //銷燬資源 173 public static void destroy() throws JMSException { 174 if (consumer != null) { 175 consumer.close(); 176 } 177 if (producer != null) { 178 producer.close(); 179 } 180 if (session != null) { 181 session.close(); 182 } 183 if (conn != null) { 184 conn.close(); 185 } 186 187 } 188 189 }
上面的代碼是點對點消息發送模式的實現,其實,發佈/預約模式的實現也差很少了。
1 package com.demo; 2 3 import java.io.UnsupportedEncodingException; 4 5 import javax.jms.BytesMessage; 6 import javax.jms.Connection; 7 import javax.jms.DeliveryMode; 8 import javax.jms.ExceptionListener; 9 import javax.jms.JMSException; 10 import javax.jms.Message; 11 import javax.jms.MessageConsumer; 12 import javax.jms.MessageListener; 13 import javax.jms.MessageProducer; 14 import javax.jms.Session; 15 import javax.jms.TextMessage; 16 import javax.jms.Topic; 17 import javax.jms.TopicConnection; 18 import javax.jms.TopicConnectionFactory; 19 import javax.naming.NamingException; 20 21 import com.ibm.mq.jms.MQTopicConnectionFactory; 22 23 public class JmsTopicDemo { 24 private static Connection conn; 25 private static Session session; 26 private static MessageProducer producer; 27 private static MessageConsumer consumer; 28 private static TopicConnection tConn = null; 29 30 public static void init() { 31 // 鏈接工廠,用com.ibm.mq.jms中的類實現javax.jms中的接口 32 TopicConnectionFactory tcf = new MQTopicConnectionFactory(); 33 34 // 設置鏈接工廠屬性 35 try { 36 ((MQTopicConnectionFactory) tcf).setHostName("localhost"); 37 ((MQTopicConnectionFactory) tcf).setQueueManager("TestQM"); 38 ((MQTopicConnectionFactory) tcf).setCCSID(1381); 39 ((MQTopicConnectionFactory) tcf).setChannel("SYSTEM.DEF.SVRCONN"); 40 ((MQTopicConnectionFactory) tcf).setPort(1414); 41 tConn = tcf.createTopicConnection(); 42 43 ExceptionListener exceptionListener = new ExceptionListener() { 44 45 // 此處可放入更多邏輯 46 public void onException(JMSException e) { 47 System.out.println("mq exception"); 48 e.printStackTrace(); 49 System.exit(0); 50 } 51 52 }; 53 tConn.setExceptionListener(exceptionListener); 54 } catch (JMSException e) { 55 56 e.printStackTrace(); 57 return; 58 } 59 } 60 61 public static void main(String[] args) throws NamingException, 62 JMSException, UnsupportedEncodingException, InterruptedException { 63 64 init(); 65 sendMessage(); 66 receiveMessage(); 67 receiveWithListener(); 68 destroy(); 69 70 } 71 72 public static void sendMessage() throws JMSException { 73 boolean transacted = false; 74 75 session = tConn 76 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE); 77 78 Topic inputTopic = session.createTopic("TestT"); 79 MessageProducer sender = session.createProducer(inputTopic); 80 tConn.start(); 81 82 TextMessage message = session.createTextMessage(); 83 message.setText("this is input message from topic sender"); 84 85 sender.send(message, DeliveryMode.PERSISTENT, 7, 0); 86 87 } 88 89 public static void receiveWithListener() throws JMSException { 90 // receive message from mq 91 92 boolean transacted = false; 93 94 session = tConn 95 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE); 96 Topic outputT = session.createTopic("TestT"); 97 98 consumer = session.createConsumer(outputT); 99 100 MessageListener listener = new MessageListener() { 101 public void onMessage(Message message) { 102 try { 103 if (message instanceof TextMessage) { 104 System.out.println("Listener 接收消息:" 105 + ((TextMessage) message).getText()); 106 } 107 } catch (JMSException e) { 108 e.printStackTrace(); 109 } 110 } 111 }; 112 consumer.setMessageListener(listener); 113 tConn.start(); 114 try { 115 Thread.sleep(10 * 1000); 116 } catch (InterruptedException e) { 117 e.printStackTrace(); 118 } 119 } 120 121 public static void receiveMessage() throws JMSException, 122 UnsupportedEncodingException { 123 124 boolean transacted = false; 125 126 session = tConn 127 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE); 128 129 Topic outputT = session.createTopic("TestT"); 130 consumer = session.createConsumer(outputT); 131 132 tConn.start(); 133 Message msg = consumer.receive(5000); 134 if (msg instanceof TextMessage) { 135 TextMessage message = (TextMessage) msg; 136 System.out.println("received message from topic is:" 137 + message.getText()); 138 } else if (msg instanceof BytesMessage) { 139 BytesMessage message = (BytesMessage) msg; 140 byte buff[] = null; 141 long length = message.getBodyLength(); 142 buff = new byte[(int) length]; 143 message.readBytes(buff); 144 String textmessage = new String(buff, "UTF-8"); 145 System.out.println("received message from topic is:" + textmessage); 146 } 147 } 148 149 public static void destroy() throws JMSException { 150 if (consumer != null) { 151 consumer.close(); 152 } 153 if (producer != null) { 154 producer.close(); 155 } 156 if (session != null) { 157 session.close(); 158 } 159 if (conn != null) { 160 conn.close(); 161 } 162 163 } 164 165 }
不一樣點的地方,可能就是從topic中取消息的時候,由於消息不能在topic中駐留,因此,必定要是取的動做已經發生了或準備好,再作發送消息的操做,這時候,纔可以從topic中收到發送的消息。怎麼理解呢?好比說,queue是個籃子,消息發送出去,就會在籃子裏。有時,發送的動做也許早就已經完成,但消息會一直在籃子裏。取消息的時候,從籃子中取走便可。而topic則是一個籃球框的那種籃網,消息發送到籃網,若是下面沒有接收的東西,就會直接從籃網溜走,你這時再去取消息,就只能是竹籃打水一場空啦。因此,從topic中收消息,必定要再發消息以前先把接的動做準備好的哈。
鏈接方式二:使用javax.jms中的標準JMS接口(要導入包jms.jar)
使用javax.jms中的標準JMS接口,能夠隱藏特定廠商的實現,使代碼具備更好的可移植性。開發時,只須要關注javax.jms.QueueConnectionFactory 和 javax.jms.Queue這兩個對象便可。一般JMS提供廠商會有本身的工具去構建這兩個對象,並存儲在JNDI命名空間中。能夠從命名空間中檢索這兩個對象,而不用care是哪一個廠商實現滴。
對於WMQ,這兩個對象是這樣構建滴哈。
1)打開WMQ,找到「JMS受管對象」,右擊「添加初始上下文」,以下圖選擇,記得綁定目錄是先建立好的哈。
點擊「完成」。
2)在剛建好的「初始上下文」下面,找到」鏈接工廠「,右擊新建「鏈接工廠」,以下圖,
點擊」下一步「,到最後一頁,在鏈接裏面作以下設置,
選擇」基本隊列管理器「(應先在WMQ上建立好一個隊列管理器,此處選上;另還需先建立好;一個queue,下面要用到滴),完成。
此時咱們就有javax.jms.QueueConnectionFactory這個對象了。
3)在新建的」初始上下文「下,找到」目標「,新建」目標「,點」下一步「,到最後一頁,作下面的設置,
選擇」隊列管理器「和」隊列「,應先在WMQ上建立好。這時,javax.jms.Queue這個對象也構建好了呢。
OK,完事具有了,下面就是配合JNDI方式實現的標準JMS接口了。
1 package com.demo; 2 3 import java.util.Hashtable; 4 5 import javax.jms.*; 6 import javax.naming.*; 7 8 public class Tester { 9 public static void main(String[] args) { 10 send(); 11 } 12 13 public static void send() { 14 try { 15 16 Hashtable<String, String> environment = new Hashtable<String, String>(); 17 //剛纔建的初始上下文哈 18 environment.put(Context.INITIAL_CONTEXT_FACTORY, 19 "com.sun.jndi.fscontext.RefFSContextFactory"); 20 environment.put(Context.PROVIDER_URL, "file:/D:/JNDI-Directory"); 21 InitialContext initContext = new InitialContext(environment); 22 //建立的鏈接工廠哈 23 ConnectionFactory factory = (ConnectionFactory) initContext 24 .lookup("TestQM_Q"); 25 //建立的目標哈 26 Destination destination = (Destination) initContext 27 .lookup("TestQueue"); 28 initContext.close(); 29 30 Connection connection = factory.createConnection(); 31 Session session = connection.createSession(false, 32 Session.AUTO_ACKNOWLEDGE); 33 34 MessageProducer sender = session.createProducer(destination); 35 // Send messages 36 TextMessage message = session 37 .createTextMessage("hello, this is my first jms sample code~~~"); 38 39 //記住,message並非send到「TestQueue」裏面去了,而是send到WMQ上與「TestQueue」關聯的那個queue裏面去了 40 sender.send(message); 41 session.close(); 42 connection.close(); 43 } catch (Exception e) { 44 e.printStackTrace(); 45 } 46 } 47 }
好了,就介紹到這裏了。Happy ending~~~