一. 所需依賴包,安裝 IBM websphere MQ 後,在安裝目錄下的 java 目錄內java
import java.io.IOException; import java.util.Properties; import com.ibm.mq.MQC; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.CMQC; import com.ibm.mq.constants.MQConstants; /** * 客戶端模式開發 * * @author Geely * */ public class MQTest1 { public static void main(String[] args) throws MQException, IOException { // 發送消息給隊列 put(); // 從隊列讀取消息 get(); // 獲取隊列深度 getDepth(); } @SuppressWarnings("unchecked") static void put() throws MQException, IOException { // 配置MQ服務器鏈接參數 MQEnvironment.hostname = "127.0.0.1"; MQEnvironment.port = 1414; MQEnvironment.channel = "QM_ACK"; // MQEnvironment.userID = ""; // MQEnvironment.password = ""; // 設置隊列管理器字符集 編碼 MQEnvironment.CCSID = 1381; // 設置應用名稱,方便服務器MQ 查看應用鏈接 MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 設置應用名稱,方便服務器MQ 查看應用鏈接 MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); // 建立實例,鏈接隊列管理器 MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 以可寫的方式訪問隊列管理器已定義的隊列QUEUE1,固然也能夠建立隊列 MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT); // 新建併發送消息給隊列 MQMessage myMessage = new MQMessage(); myMessage.expiry = -1; // 設置消息用不過時 String name = "MePlusPlus's 博客"; myMessage.writeUTF(name); // 使用默認的消息選項 MQPutMessageOptions pmo = new MQPutMessageOptions(); // 發送消息 putQueue.put(myMessage, pmo); putQueue.close(); // 斷開鏈接 queueManager.disconnect(); } @SuppressWarnings("unchecked") static void get() throws MQException, IOException { // 配置MQ服務器鏈接參數 MQEnvironment.hostname = "127.0.0.1"; MQEnvironment.port = 1414; MQEnvironment.channel = "QM_ACK"; // MQEnvironment.userID = ""; // MQEnvironment.password = ""; // 設置隊列管理器字符集 編碼 MQEnvironment.CCSID = 1381; // 設置應用名稱,方便服務器MQ 查看應用鏈接 MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java"); // 設置應用名稱,方便服務器MQ 查看應用鏈接 MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS); // 建立實例,鏈接隊列管理器 MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties); // 打開隊列. int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE; // 以可讀的方式訪問隊列管理器已定義的隊列QUEUE1 // MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF); MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions(); // Get messages under sync point control. // 在同步點控制下獲取消息. //gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT; // Wait if no messages on the Queue. // 若是在隊列上沒有消息則等待. //gmo.options = gmo.options + CMQC.MQGMO_WAIT; // Fail if QeueManager Quiescing. // 若是隊列管理器停頓則失敗. //gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING; // Sets the time limit for the wait. // 設置等待的時間限制. gmo.waitInterval = 3000; // 從隊列讀取消息 MQMessage theMessage = new MQMessage(); getQueue.get(theMessage, gmo); String name = theMessage.readUTF(); System.out.println(name); getQueue.close(); // 斷開鏈接 queueManager.disconnect(); } static void getDepth() throws MQException, IOException { Properties props = new Properties(); props .put("hostname", "127.0.0.1"); props .put("port", 1414); //端口號 props .put("channel", "QM_ACK"); //服務器鏈接通道 props .put("CCSID", 1381); props .put("transport", "MQSeries"); // 建立實例,鏈接隊列管理器 MQQueueManager queueManager = new MQQueueManager("QM",props ); // 打開隊列. int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE; // 以可讀的方式訪問隊列管理器已定義的隊列QUEUE1 MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null); MQGetMessageOptions gmo = new MQGetMessageOptions(); // Get messages under sync point control. // 在同步點控制下獲取消息. // gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT; // Wait if no messages on the Queue. // 若是在隊列上沒有消息則等待. // gmo.options = gmo.options + CMQC.MQGMO_WAIT; // Fail if QeueManager Quiescing. // 若是隊列管理器停頓則失敗. // gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING; // Sets the time limit for the wait. // 設置等待的時間限制. gmo.waitInterval = 3000; int depth = getQueue.getCurrentDepth(); System.out.println("該隊列當前的深度爲:"+depth); System.out.println("==========================="); while(depth-->0) { MQMessage msg = new MQMessage();// 要讀的隊列的消息 getQueue.get(msg, gmo); System.out.println("消息的大小爲:"+msg.getDataLength()); System.out.println("消息的內容:\n"+msg.readUTF()); System.out.println("---------------------------"); } System.out.println("Finish!!!"); getQueue.close(); // 斷開鏈接 queueManager.disconnect(); } }
JAVA從MQ讀取消息的時候報錯及解決web
JAVA經過writeUTF將消息寫入到MQ,再經過JAVA採用MQMessage讀消息的方法readUTF()去讀取的時候,就不會報錯,能夠正常讀出來。可是若是採用在MQ資源管理器中插入測試消息或者是經過另一臺MQ服務器往當前MQ服務器經過遠程隊例寫消息過來,經過JAVA讀取出會錯.服務器
MQMessage mqmsg = new MQMessage();// 要讀的隊列的消息 getQueue.get(mqmsg, gmo); System.out.println("消息的大小爲:" + mqmsg.getDataLength()); byte[] rawData = new byte[mqmsg.getMessageLength()]; // 先轉byte mqmsg.readFully(rawData); // 讀出全部數據 String msg = new String(rawData); // byte轉string System.out.println("消息的內容:\n"+msg);