java往MQ裏面發消息和取消息java
package test; import java.io.IOException; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.MQConstants; public class test1{ //定義隊列管理器和隊列的名稱 private static String qmName; private static String qName; private static MQQueueManager qMgr; static{ //設置環境: //MQEnvironment中包含控制MQQueueManager對象中的環境的構成的靜態變量,MQEnvironment的值的設定會在MQQueueManager的構造函數加載的時候起做用, //所以必須在創建MQQueueManager對象以前設定MQEnvironment中的值. MQEnvironment.hostname="172.31.17.162"; //MQ服務器的IP地址 MQEnvironment.channel="SVRCONN_GW"; //服務器鏈接的通道 MQEnvironment.CCSID=1208; //服務器MQ服務使用的編碼1381表明GBK、1208表明UTF(Coded Character Set Identifier:CCSID) MQEnvironment.port=33333; //MQ端口 qmName = "QM_TEST"; //MQ的隊列管理器名稱 qName = "TEST"; //MQ遠程隊列的名稱 try { //定義並初始化隊列管理器對象並鏈接 //MQQueueManager能夠被多線程共享,可是從MQ獲取信息的時候是同步的,任什麼時候候只有一個線程能夠和MQ通訊。 qMgr = new MQQueueManager(qmName); } catch (MQException e) { // TODO Auto-generated catch block System.out.println("初使化MQ出錯"); e.printStackTrace(); } } /** * 往MQ發送消息 * @param message * @return */ public static int sendMessage(String message){ int result=0; try{ //設置將要鏈接的隊列屬性 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface //(except for completion code constants and error code constants). //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. //MQOO_OUTPUT:Open the queue to put messages. /*目標爲遠程隊列,全部這裏不能夠用MQOO_INPUT_AS_Q_DEF屬性*/ //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; /*如下選項可適合遠程隊列與本地隊列*/ int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; //鏈接隊列 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. //The inquire and set capabilities are inherited from MQManagedObject. /*關閉了就從新打開*/ if(qMgr==null || !qMgr.isConnected()){ qMgr = new MQQueueManager(qmName); } MQQueue queue = qMgr.accessQueue(qName, openOptions); //定義一個簡單的消息 MQMessage putMessage = new MQMessage(); //將數據放入消息緩衝區 putMessage.writeUTF(message); //設置寫入消息的屬性(默認屬性) MQPutMessageOptions pmo = new MQPutMessageOptions(); //將消息寫入隊列 queue.put(putMessage,pmo); queue.close(); }catch (MQException ex) { System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); ex.printStackTrace(); }catch (IOException ex) { System.out.println("An error occurred whilst writing to the message buffer: " + ex); }catch(Exception ex){ ex.printStackTrace(); }finally{ try { qMgr.disconnect(); } catch (MQException e) { e.printStackTrace(); } } return result; } /** * 從隊列中去獲取消息,若是隊列中沒有消息,就會發生異常,不過沒有關係,有TRY...CATCH,若是是第三方程序調用方法,若是無返回則說明無消息 * 第三方能夠將該方法放於一個無限循環的while(true){...}之中,不須要設置等待,由於在該方法內部在沒有消息的時候會自動等待。 * @return */ public static String getMessage(){ String message=null; try{ //設置將要鏈接的隊列屬性 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface //(except for completion code constants and error code constants). //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. //MQOO_OUTPUT:Open the queue to put messages. int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; MQMessage retrieve = new MQMessage(); //設置取出消息的屬性(默認屬性) //Set the put message options.(設置放置消息選項) MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步點控制下獲取消息) gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(若是在隊列上沒有消息則等待) gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(若是隊列管理器停頓則失敗) gmo.waitInterval = 1000 ; // Sets the time limit for the wait.(設置等待的毫秒時間限制) /*關閉了就從新打開*/ if(qMgr==null || !qMgr.isConnected()){ qMgr = new MQQueueManager(qmName); } MQQueue queue = qMgr.accessQueue(qName, openOptions); // 從隊列中取出消息 queue.get(retrieve, gmo); byte[] buf = new byte[retrieve.getMessageLength()]; retrieve.readFully(buf); System.out.println("The message is: " + new String(buf)); queue.close(); }catch (MQException ex) { System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); }catch (IOException ex) { System.out.println("An error occurred whilst writing to the message buffer: " + ex); }catch(Exception ex){ ex.printStackTrace(); }finally{ try { qMgr.disconnect(); } catch (MQException e) { e.printStackTrace(); } } return message; } public static void main(String args[]) throws InterruptedException { /*下面兩個方法可同時使用,也能夠單獨使用*/ //sendMessage("this is a test"); //System.out.println("jwh"); for (int i = 0; i < 100; i++) { getMessage(); Thread.sleep(20000); } } } *