IBM MQ 學習

  1 import java.io.IOException;  
  2 import java.util.HashMap;  
  3 import java.util.Map;  
  4   
  5 import com.ibm.mq.MQC;  
  6 import com.ibm.mq.MQEnvironment;  
  7 import com.ibm.mq.MQException;  
  8 import com.ibm.mq.MQGetMessageOptions;  
  9 import com.ibm.mq.MQMessage;  
 10 import com.ibm.mq.MQPutMessageOptions;  
 11 import com.ibm.mq.MQQueue;  
 12 import com.ibm.mq.MQQueueManager;  
 13     
 14 public class CLIENT_MQ{    
 15      //定義隊列管理器和隊列的名稱    
 16      private static final String qmName = "MQ_SERVICE";//MQ的隊列管理器名稱 ;     
 17      //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ遠程隊列的名稱    
 18      private static MQQueueManager qMgr;//隊列管理器  
 19      public  static void init(){    
 20          //設置環境:    
 21          //MQEnvironment中包含控制MQQueueManager對象中的環境的構成的靜態變量,MQEnvironment的值的設定會在MQQueueManager的構造函數加載的時候起做用,    
 22          //所以必須在創建MQQueueManager對象以前設定MQEnvironment中的值.    
 23          MQEnvironment.hostname="10.172.12.156";          //MQ服務器的IP地址          
 24          MQEnvironment.channel="SERVICE_JAVA";           //通道類型:服務器鏈接  
 25          MQEnvironment.CCSID=1381;//437                    //服務器MQ服務使用的編碼1381表明GBK、1208表明UTF(Coded Character Set Identifier:CCSID)    
 26          MQEnvironment.port=1456;                       //MQ端口    
 27          try {    
 28              //定義並初始化隊列管理器對象並鏈接     
 29              //MQQueueManager能夠被多線程共享,可是從MQ獲取信息的時候是同步的,任什麼時候候只有一個線程能夠和MQ通訊。    
 30             qMgr = new MQQueueManager(qmName);    
 31         } catch (MQException e) {    
 32             // TODO Auto-generated catch block    
 33             System.out.println("初使化MQ出錯");    
 34             e.printStackTrace();    
 35         }     
 36      }    
 37      /**  
 38       * 往MQ發送消息  
 39       * @param message  
 40       * @return  
 41       */    
 42      public static Map<String,Object> sendMessage(Object message,String qName){      
 43          Map<String,Object> map=new HashMap<String,Object>();  
 44          try{       
 45              //設置將要鏈接的隊列屬性    
 46              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
 47              //(except for completion code constants and error code constants).    
 48              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
 49              //MQOO_OUTPUT:Open the queue to put messages.    
 50              /*目標爲遠程隊列,全部這裏不能夠用MQOO_INPUT_AS_Q_DEF屬性*/    
 51              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
 52              /*如下選項可適合遠程隊列與本地隊列*/    
 53              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //發送時使用  
 54              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用  
 55              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;    
 56              //鏈接隊列     
 57              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.     
 58              //The inquire and set capabilities are inherited from MQManagedObject.     
 59              /*關閉了就從新打開*/    
 60             if(qMgr==null || !qMgr.isConnected()){    
 61                 qMgr = new MQQueueManager(qmName);    
 62             }    
 63              MQQueue queue = qMgr.accessQueue(qName, openOptions);              
 64              //定義一個簡單的消息    
 65              MQMessage putMessage = new MQMessage();  
 66              map.put("messageId",putMessage);  
 67              //String uuid=java.util.UUID.randomUUID().toString();  
 68              //將數據放入消息緩衝區    
 69              putMessage.writeObject(message);      
 70              //設置寫入消息的屬性(默認屬性)    
 71              MQPutMessageOptions pmo = new MQPutMessageOptions();   
 72               
 73              //將消息寫入隊列     
 74              queue.put(putMessage,pmo);   
 75              map.put("message",message.toString());  
 76              queue.close();    
 77          }catch (MQException ex) {     
 78              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);     
 79              ex.printStackTrace();    
 80          }catch (IOException ex) {     
 81              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
 82          }catch(Exception ex){    
 83              ex.printStackTrace();    
 84          }finally{    
 85              try {    
 86                 qMgr.disconnect();    
 87             } catch (MQException e) {    
 88                 e.printStackTrace();    
 89             }    
 90           }    
 91          return map;    
 92      }    
 93        
 94        
 95         
 96   
 97        
 98      /**  
 99       * 處理完消息回放到MQ隊列  
100       * @param message  
101       * @return  
102       */    
103      public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){      
104          Map<String,Object> map=new HashMap<String,Object>();  
105          try{       
106              //設置將要鏈接的隊列屬性    
107              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
108              //(except for completion code constants and error code constants).    
109              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
110              //MQOO_OUTPUT:Open the queue to put messages.    
111              /*目標爲遠程隊列,全部這裏不能夠用MQOO_INPUT_AS_Q_DEF屬性*/    
112              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
113              /*如下選項可適合遠程隊列與本地隊列*/    
114              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //發送時使用  
115              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用  
116              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;    
117              //鏈接隊列     
118              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.     
119              //The inquire and set capabilities are inherited from MQManagedObject.     
120              /*關閉了就從新打開*/    
121             if(qMgr==null || !qMgr.isConnected()){    
122                 qMgr = new MQQueueManager(qmName);    
123             }    
124              MQQueue queue = qMgr.accessQueue(qName, openOptions);              
125              //定義一個簡單的消息    
126              MQMessage putMessage = new MQMessage();  
127              putMessage.messageId=mqMessage.messageId;  
128              map.put("messageId",putMessage);  
129              //String uuid=java.util.UUID.randomUUID().toString();  
130              //將數據放入消息緩衝區    
131              putMessage.writeObject(message);      
132              //設置寫入消息的屬性(默認屬性)    
133              MQPutMessageOptions pmo = new MQPutMessageOptions();   
134               
135              //將消息寫入隊列     
136              queue.put(putMessage,pmo);   
137              map.put("message",message.toString());  
138              queue.close();    
139          }catch (MQException ex) {     
140              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);     
141              ex.printStackTrace();    
142          }catch (IOException ex) {     
143              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
144          }catch(Exception ex){    
145              ex.printStackTrace();    
146          }finally{    
147              try {    
148                 qMgr.disconnect();    
149             } catch (MQException e) {    
150                 e.printStackTrace();    
151             }    
152           }    
153          return map;    
154      }    
155        
156        
157        
158        
159      /**  
160       * 從隊列中去獲取消息,若是隊列中沒有消息,就會發生異常,不過沒有關係,有TRY...CATCH,若是是第三方程序調用方法,若是無返回則說明無消息  
161       * 第三方能夠將該方法放於一個無限循環的while(true){...}之中,不須要設置等待,由於在該方法內部在沒有消息的時候會自動等待。  
162       * @return  
163       */    
164      public static String getMessage(String qName,MQMessage mqMessage){    
165          String message="";    
166          try{                
167              //設置將要鏈接的隊列屬性    
168              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
169              //(except for completion code constants and error code constants).    
170              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
171              //MQOO_OUTPUT:Open the queue to put messages.    
172              //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 發送時使用  
173              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用  
174                
175              int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
176              MQMessage retrieve = new MQMessage();    
177              //設置取出消息的屬性(默認屬性)    
178              //Set the put message options.(設置放置消息選項)     
179              MQGetMessageOptions gmo = new MQGetMessageOptions();     
180                
181              gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步點控制下獲取消息)     
182              gmo.options = gmo.options + MQC.MQGMO_WAIT;  // Wait if no messages on the Queue(若是在隊列上沒有消息則等待)     
183              gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(若是隊列管理器停頓則失敗)     
184              gmo.waitInterval = 3000 ;  // Sets the time limit for the wait.(設置等待的毫秒時間限制)     
185              /*關閉了就從新打開*/    
186             if(qMgr==null || !qMgr.isConnected()){    
187                 qMgr = new MQQueueManager(qmName);    
188             }    
189              MQQueue queue = qMgr.accessQueue(qName, openOptions);    
190                
191              MQMessage retrievedMessage = new MQMessage();  
192              //從隊列中取出對應messageId的消息  
193              retrieve.messageId = mqMessage.messageId;   
194              // 從隊列中取出消息  
195              queue.get(retrieve, gmo);    
196                
197               
198              Object obj = retrieve.readObject();  
199              message=obj.toString();//解決中文亂碼問題  
200              /* 
201       
202              //int size = rcvMessage.getMessageLength(); 
203              //byte[] p = new byte[size]; 
204              //rcvMessage.readFully(p); 
205               
206              int len=retrieve.getDataLength(); 
207              byte[] str = new byte[len]; 
208               retrieve.readFully(str,0,len); 
209               message = new String(str);//readUTF();     
210              */  
211              
212              queue.close();    
213          }catch (MQException ex) {  
214              int reason=ex.reasonCode;  
215              if(reason==2033)//no messages  
216              {  
217                 message="nomessage";  
218              }else{  
219                 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);     
220              }  
221          }catch (IOException ex) {     
222              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
223          }catch(Exception ex){    
224              ex.printStackTrace();    
225          }finally{    
226             try {    
227                 qMgr.disconnect();    
228             } catch (MQException e) {    
229                 e.printStackTrace();    
230             }    
231          }    
232          return message;    
233      }    
234   
235   
236        
237      public static void main(String args[]) {    
238          init();  
239          Map<String,Object> map = new HashMap<String,Object>();  
240          map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE");  
241          MQMessage mqMessage = (MQMessage)map.get("messageId");  
242          outSys("傳輸消息:",mqMessage.messageId.toString());    
243            
244          outSys("接收傳輸隊列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage));  
245          Map<String,Object> reply_map = new HashMap<String,Object>();  
246          reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage);  
247          outSys("放入正常隊列:",reply_map.get("message").toString());  
248            
249          outSys("接收正常隊列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage));  
250            
251            
252      }  
253        
254        
255        
256      public static void outSys(String display,String val){  
257          System.out.println(display+val);  
258      }  
259        
260 }   
相關文章
相關標籤/搜索