JMS鏈接WMQ及收發消息

由於對JMS的瞭解也只算入門級,有些概念也很模糊,不過,滷煮會盡量去介紹的。另外,sample code都調試過能夠跑。java

1.神馬是JMS?服務器

jms即Java消息服務(Java Message Service)應用程序接口是一個Java平臺中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。Java消息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支持。——摘自百度百科session

2.JMS組成要素多線程

JMS提供者,JMS客戶,JMS生產者,JMS消費者,JMS隊列和JMS主題。具體啥意思,仍是參考百度百科吧,不過,概念講起來也木有啥意思,還會把人繞暈。多跑幾回sample,回來再看概念可能更清楚些哈。另,博文中的JMS提供者以IBM 的WebSphere MQ爲例。併發

3.JMS模型app

JMS模型定義一組可供 Java™ 應用程序用於執行消息傳遞操做的接口。異步

如下列表歸納了主要的JMS接口:分佈式

Destination:Destination 對象是應用程序將消息發往的位置和/或應用程序從其接收消息的源。ide

ConnectionFactory:ConnectionFactory 對象包括鏈接的一組配置屬性。應用程序使用鏈接工廠來建立鏈接。工具

Connection:Connection 對象包括應用程序與消息傳遞服務器的活動鏈接。應用程序使用鏈接來建立會話。

Session:Session 對象是用於發送和接收消息的單個線程上下文。應用程序使用會話來建立消息、消息生產者和消息使用者。會話是事務性或非事務性會話。

Message:Message 對象包括應用程序發送或接收的消息。

MessageProducer:應用程序使用消息生產者將消息發送到目標。

MessageConsumer:應用程序使用消息使用者來接收已發送到目標的消息。

下圖是這些對象之間的關係(摘自IBM Info Center)其實,IBM info center中資料多多的呀。

Destination、ConnectionFactory 或 Connection 對象可供多線程應用程序的不一樣線程併發使用,可是 Session、MessageProducer 或 MessageConsumer 對象不能供不一樣線程併發使用。確保不併發使用 Session、MessageProducer 或 MessageConsumer 對象的最簡單方法是爲每一個線程建立單獨的 Session 對象。

JMS支持兩種消息傳遞樣式:

  • 點到點消息傳遞
  • 發佈/預訂消息傳遞

這兩類消息傳遞也被稱爲消息傳遞域,且您能夠將兩類消息傳遞都組合在一個應用程序中。在點到點域中,目標是隊列,而在發佈/預訂域中,目標是主題。

 經過JMS1.1 之前的JMS版本,對點到點域的程序設計使用一組接口和方法,而對發佈/預訂域的程序設計使用另外一組接口和方法。兩組接口和方法是類似的,但卻各自獨立。經過JMS1.1,您可使用一組公共的支持兩類消息傳遞域的接口和方法。公共接口提供了獨立於域的每一個消息傳遞域的視圖。下表列出了獨立於JMS 域的接口及其相應的特定於域的接口。

特別清楚有木有。

4.開發JMS客戶端用於鏈接WMQ及收發消息

下面的code能跑起來的前提是,本地已安裝MQ,且建立好隊列管理器和相應的QUEUE,TOPIC。

鏈接方式一:使用的是IBM對於JMS的實現(要導入包com.ibm.mqjms.jar):

  1 package com.demo;
  2 
  3 import java.io.UnsupportedEncodingException;
  4 
  5 import javax.jms.BytesMessage;
  6 import javax.jms.Connection;
  7 import javax.jms.DeliveryMode;
  8 import javax.jms.ExceptionListener;
  9 import javax.jms.JMSException;
 10 import javax.jms.Message;
 11 import javax.jms.MessageConsumer;
 12 import javax.jms.MessageListener;
 13 import javax.jms.MessageProducer;
 14 import javax.jms.Queue;
 15 import javax.jms.QueueConnection;
 16 import javax.jms.QueueConnectionFactory;
 17 import javax.jms.Session;
 18 import javax.jms.TextMessage;
 19 import javax.naming.NamingException;
 20 
 21 import com.ibm.mq.jms.MQQueueConnectionFactory;
 22 
 23 public class JmsQueueDemo {
 24     private static Connection conn = null;
 25     private static Session session = null;
 26     private static MessageProducer producer = null;
 27     private static MessageConsumer consumer = null;
 28     private static QueueConnection qConn = null;
 29 
 30     public static void init() {
 31         // 鏈接工廠,用com.ibm.mq.jms中的類實現javax.jms中的接口
 32         QueueConnectionFactory qcf = new MQQueueConnectionFactory();
 33 
 34         // 設置鏈接工廠屬性
 35         try {
 36             //設置WMQ所在機器的IP
 37             ((MQQueueConnectionFactory) qcf).setHostName("localhost");
 38             //設置WMQ上隊列管理器名
 39             ((MQQueueConnectionFactory) qcf).setQueueManager("TestQM");
 40             //設置WMQ上的通道名
 41             ((MQQueueConnectionFactory) qcf).setChannel("SYSTEM.DEF.SVRCONN");
 42             //設置WMQ上的監聽端口
 43             ((MQQueueConnectionFactory) qcf).setPort(1414);
 44             
 45             //由鏈接工廠建立鏈接
 46             qConn = qcf.createQueueConnection();
 47 
 48             //創建異常監聽器用於監聽鏈接過程當中發生的異常
 49              ExceptionListener exceptionListener = new ExceptionListener(){
 50             
 51              //此處可放入更多邏輯,由本身定義
 52              public void onException(JMSException e) {
 53              System.out.println("mq exception");
 54              e.printStackTrace();
 55              System.exit(0);
 56              }
 57             
 58              };
 59              //在鏈接上面註冊監聽器
 60              qConn.setExceptionListener(exceptionListener);
 61         } catch (JMSException e) {
 62 
 63             e.printStackTrace();
 64             return;
 65         }
 66     }
 67 
 68     public static void main(String[] args) throws NamingException,
 69             JMSException, UnsupportedEncodingException {
 70 
 71         init();
 72          sendMessage();
 73         // receiveMessage();
 74         receiveWithListener();
 75         destroy();
 76 
 77     }
 78 
 79     public static void sendMessage() throws JMSException {
 80         boolean transacted = false;
 81 
 82         // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)]
 83         session = qConn
 84                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
 85         // 由session建立要發送到的隊列
 86         Queue inputQ = session.createQueue("TestQ");
 87         
 88         //由session建立消息發送者
 89         MessageProducer sender = session.createProducer(inputQ);
 90         
 91         //啓動鏈接
 92         qConn.start();
 93         
 94         // 消息由會話建立
 95         TextMessage message = session.createTextMessage();
 96         //設置消息內容
 97         message.setText("this is input message from queue sender");
 98         //這句無關緊要的哦,主要用於設置消息屬性;方便後面取消息時,取特定類型的消息,如"company='systems'"
 99         message.setStringProperty("company", "systems");
100 
101         // 發送消息,後面的參數依次爲消息的持久性設置,消息的優先級,消息在隊列的存活時間,設置爲0,表示永不失效
102         //DeliveryMode爲PERSISTENT表示,隊列管理器或者WMQ重啓後,消息仍在queue中;NON_PERSISTENT意思相反
103         sender.send(message, DeliveryMode.NON_PERSISTENT, 7, 0);
104     }
105 
106     //使用listener的方式從queue中取消息,可一次取多條消息出來
107     public static void receiveWithListener() throws JMSException {
108 
109         boolean transacted = false;
110 
111         // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)]
112         session = qConn
113                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
114         //從同一個queue中取消息,此處爲使用session建立queue
115         Queue outputQ = session.createQueue("TestQ");
116         //使用session建立消息消費者,注意了,後面的那個參數就是消息選擇器,用於接收特定類型的消息
117         consumer = session.createConsumer(outputQ, "company='t-systems'");
118         //建立消息監聽器
119         MessageListener listener = new MessageListener() {
120             public void onMessage(Message message) {
121                 try {
122                     if (message instanceof TextMessage) {
123                         System.out.println("Listener 接收消息:"
124                                 + ((TextMessage) message).getText());
125                     }
126                 } catch (JMSException e) {
127                     e.printStackTrace();
128                 }
129             }
130         };
131         //註冊消息監聽器
132         consumer.setMessageListener(listener);
133         //啓動鏈接
134         qConn.start();
135         try {
136             Thread.sleep(10 * 1000);
137         } catch (InterruptedException e) {
138             e.printStackTrace();
139         }
140     }
141     //取消息的另外一種方式,手動從queue中取消息,一次只能接收一條消息
142     public static void receiveMessage() throws JMSException,
143             UnsupportedEncodingException {
144 
145         boolean transacted = false;
146 
147         // 非事務處理(分別接收或發送消息)[事務處理(所有發送或者所有接收做爲一個單元的一組消息)]
148         session = qConn
149                 .createQueueSession(transacted, Session.AUTO_ACKNOWLEDGE);
150         // 對隊列管理器上隊列的映射
151         Queue outputQ = session.createQueue("TestQ");
152         consumer = session.createConsumer(outputQ);
153         qConn.start();
154         //此時,若send的message中,設置了message的屬性如,"company='systems'",下面的方法是取不到消息的哈
155         //要取到消息可屏蔽到send中設置message屬性的語句,或使用consumer.receive("company='systems'")
156         Message msg = consumer.receiveNoWait();
157         //轉換消息格式
158         if (msg instanceof TextMessage) {
159             TextMessage message = (TextMessage) msg;
160             System.out.println("received message from queue is:"
161                     + message.getText());
162         } else if (msg instanceof BytesMessage) {
163             BytesMessage message = (BytesMessage) msg;
164             byte buff[] = null;
165             long length = message.getBodyLength();
166             buff = new byte[(int) length];
167             message.readBytes(buff);
168             String textmessage = new String(buff, "UTF-8");
169             System.out.println("received message from queue is:" + textmessage);
170         }
171     }
172     //銷燬資源
173     public static void destroy() throws JMSException {
174         if (consumer != null) {
175             consumer.close();
176         }
177         if (producer != null) {
178             producer.close();
179         }
180         if (session != null) {
181             session.close();
182         }
183         if (conn != null) {
184             conn.close();
185         }
186 
187     }
188 
189 }
View Code

上面的代碼是點對點消息發送模式的實現,其實,發佈/預約模式的實現也差很少了。

  1 package com.demo;
  2 
  3 import java.io.UnsupportedEncodingException;
  4 
  5 import javax.jms.BytesMessage;
  6 import javax.jms.Connection;
  7 import javax.jms.DeliveryMode;
  8 import javax.jms.ExceptionListener;
  9 import javax.jms.JMSException;
 10 import javax.jms.Message;
 11 import javax.jms.MessageConsumer;
 12 import javax.jms.MessageListener;
 13 import javax.jms.MessageProducer;
 14 import javax.jms.Session;
 15 import javax.jms.TextMessage;
 16 import javax.jms.Topic;
 17 import javax.jms.TopicConnection;
 18 import javax.jms.TopicConnectionFactory;
 19 import javax.naming.NamingException;
 20 
 21 import com.ibm.mq.jms.MQTopicConnectionFactory;
 22 
 23 public class JmsTopicDemo {
 24     private static Connection conn;
 25     private static Session session;
 26     private static MessageProducer producer;
 27     private static MessageConsumer consumer;
 28     private static TopicConnection tConn = null;
 29 
 30     public static void init() {
 31         // 鏈接工廠,用com.ibm.mq.jms中的類實現javax.jms中的接口
 32         TopicConnectionFactory tcf = new MQTopicConnectionFactory();
 33 
 34         // 設置鏈接工廠屬性
 35         try {
 36             ((MQTopicConnectionFactory) tcf).setHostName("localhost");
 37             ((MQTopicConnectionFactory) tcf).setQueueManager("TestQM");
 38             ((MQTopicConnectionFactory) tcf).setCCSID(1381);
 39             ((MQTopicConnectionFactory) tcf).setChannel("SYSTEM.DEF.SVRCONN");
 40             ((MQTopicConnectionFactory) tcf).setPort(1414);
 41             tConn = tcf.createTopicConnection();
 42 
 43             ExceptionListener exceptionListener = new ExceptionListener() {
 44 
 45                 // 此處可放入更多邏輯
 46                 public void onException(JMSException e) {
 47                     System.out.println("mq exception");
 48                     e.printStackTrace();
 49                     System.exit(0);
 50                 }
 51 
 52             };
 53             tConn.setExceptionListener(exceptionListener);
 54         } catch (JMSException e) {
 55 
 56             e.printStackTrace();
 57             return;
 58         }
 59     }
 60 
 61     public static void main(String[] args) throws NamingException,
 62             JMSException, UnsupportedEncodingException, InterruptedException {
 63 
 64         init();
 65         sendMessage();
 66         receiveMessage();
 67         receiveWithListener();
 68         destroy();
 69 
 70     }
 71 
 72     public static void sendMessage() throws JMSException {
 73         boolean transacted = false;
 74 
 75         session = tConn
 76                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
 77 
 78         Topic inputTopic = session.createTopic("TestT");
 79         MessageProducer sender = session.createProducer(inputTopic);
 80         tConn.start();
 81 
 82         TextMessage message = session.createTextMessage();
 83         message.setText("this is input message from topic sender");
 84 
 85         sender.send(message, DeliveryMode.PERSISTENT, 7, 0);
 86 
 87     }
 88 
 89     public static void receiveWithListener() throws JMSException {
 90         // receive message from mq
 91 
 92         boolean transacted = false;
 93 
 94         session = tConn
 95                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
 96         Topic outputT = session.createTopic("TestT");
 97 
 98         consumer = session.createConsumer(outputT);
 99 
100         MessageListener listener = new MessageListener() {
101             public void onMessage(Message message) {
102                 try {
103                     if (message instanceof TextMessage) {
104                         System.out.println("Listener 接收消息:"
105                                 + ((TextMessage) message).getText());
106                     }
107                 } catch (JMSException e) {
108                     e.printStackTrace();
109                 }
110             }
111         };
112         consumer.setMessageListener(listener);
113         tConn.start();
114         try {
115             Thread.sleep(10 * 1000);
116         } catch (InterruptedException e) {
117             e.printStackTrace();
118         }
119     }
120 
121     public static void receiveMessage() throws JMSException,
122             UnsupportedEncodingException {
123 
124         boolean transacted = false;
125 
126         session = tConn
127                 .createTopicSession(transacted, Session.AUTO_ACKNOWLEDGE);
128 
129         Topic outputT = session.createTopic("TestT");
130         consumer = session.createConsumer(outputT);
131 
132         tConn.start();
133         Message msg = consumer.receive(5000);
134         if (msg instanceof TextMessage) {
135             TextMessage message = (TextMessage) msg;
136             System.out.println("received message from topic is:"
137                     + message.getText());
138         } else if (msg instanceof BytesMessage) {
139             BytesMessage message = (BytesMessage) msg;
140             byte buff[] = null;
141             long length = message.getBodyLength();
142             buff = new byte[(int) length];
143             message.readBytes(buff);
144             String textmessage = new String(buff, "UTF-8");
145             System.out.println("received message from topic is:" + textmessage);
146         }
147     }
148 
149     public static void destroy() throws JMSException {
150         if (consumer != null) {
151             consumer.close();
152         }
153         if (producer != null) {
154             producer.close();
155         }
156         if (session != null) {
157             session.close();
158         }
159         if (conn != null) {
160             conn.close();
161         }
162 
163     }
164 
165 }
View Code

不一樣點的地方,可能就是從topic中取消息的時候,由於消息不能在topic中駐留,因此,必定要是取的動做已經發生了或準備好,再作發送消息的操做,這時候,纔可以從topic中收到發送的消息。怎麼理解呢?好比說,queue是個籃子,消息發送出去,就會在籃子裏。有時,發送的動做也許早就已經完成,但消息會一直在籃子裏。取消息的時候,從籃子中取走便可。而topic則是一個籃球框的那種籃網,消息發送到籃網,若是下面沒有接收的東西,就會直接從籃網溜走,你這時再去取消息,就只能是竹籃打水一場空啦。因此,從topic中收消息,必定要再發消息以前先把接的動做準備好的哈。

鏈接方式二:使用javax.jms中的標準JMS接口(要導入包jms.jar)

使用javax.jms中的標準JMS接口,能夠隱藏特定廠商的實現,使代碼具備更好的可移植性。開發時,只須要關注javax.jms.QueueConnectionFactory 和 javax.jms.Queue這兩個對象便可。一般JMS提供廠商會有本身的工具去構建這兩個對象,並存儲在JNDI命名空間中。能夠從命名空間中檢索這兩個對象,而不用care是哪一個廠商實現滴。

對於WMQ,這兩個對象是這樣構建滴哈。

1)打開WMQ,找到「JMS受管對象」,右擊「添加初始上下文」,以下圖選擇,記得綁定目錄是先建立好的哈。

點擊「完成」。

2)在剛建好的「初始上下文」下面,找到」鏈接工廠「,右擊新建「鏈接工廠」,以下圖,

點擊」下一步「,到最後一頁,在鏈接裏面作以下設置,

選擇」基本隊列管理器「(應先在WMQ上建立好一個隊列管理器,此處選上;另還需先建立好;一個queue,下面要用到滴),完成。

此時咱們就有javax.jms.QueueConnectionFactory這個對象了。

3)在新建的」初始上下文「下,找到」目標「,新建」目標「,點」下一步「,到最後一頁,作下面的設置,

 選擇」隊列管理器「和」隊列「,應先在WMQ上建立好。這時,javax.jms.Queue這個對象也構建好了呢。

OK,完事具有了,下面就是配合JNDI方式實現的標準JMS接口了。

 1 package com.demo;
 2 
 3 import java.util.Hashtable;
 4 
 5 import javax.jms.*;
 6 import javax.naming.*;
 7 
 8 public class Tester {
 9     public static void main(String[] args) {
10         send();
11     }
12 
13     public static void send() {
14         try {
15         
16             Hashtable<String, String> environment = new Hashtable<String, String>();
17             //剛纔建的初始上下文哈
18             environment.put(Context.INITIAL_CONTEXT_FACTORY,
19                     "com.sun.jndi.fscontext.RefFSContextFactory");
20             environment.put(Context.PROVIDER_URL, "file:/D:/JNDI-Directory");
21             InitialContext initContext = new InitialContext(environment);
22             //建立的鏈接工廠哈
23             ConnectionFactory factory = (ConnectionFactory) initContext
24                     .lookup("TestQM_Q");
25             //建立的目標哈
26             Destination destination = (Destination) initContext
27                     .lookup("TestQueue");
28             initContext.close();
29             
30             Connection connection = factory.createConnection();
31             Session session = connection.createSession(false,
32                     Session.AUTO_ACKNOWLEDGE);
33             
34             MessageProducer sender = session.createProducer(destination);
35             // Send messages
36             TextMessage message = session

37                     .createTextMessage("hello, this is my first jms sample code~~~");
38             
39             //記住,message並非send到「TestQueue」裏面去了,而是send到WMQ上與「TestQueue」關聯的那個queue裏面去了
40             sender.send(message);
41             session.close();
42             connection.close();
43         } catch (Exception e) {
44             e.printStackTrace();
45         }
46     }
47 }
View Code

好了,就介紹到這裏了。Happy ending~~~

相關文章
相關標籤/搜索