IBM websphere MQ 消息發送與獲取

一. 所需依賴包,安裝 IBM websphere MQ 後,在安裝目錄下的 java 目錄內java

import java.io.IOException;
import java.util.Properties;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.CMQC;
import com.ibm.mq.constants.MQConstants;

/**
 * 客戶端模式開發
 * 
 * @author Geely
 *
 */
public class MQTest1 {

    public static void main(String[] args) throws MQException, IOException {
        
        // 發送消息給隊列
        put();

        // 從隊列讀取消息
        get();
        
        // 獲取隊列深度
        getDepth();
        
    }

    @SuppressWarnings("unchecked")
    static void put() throws MQException, IOException {

        // 配置MQ服務器鏈接參數
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 設置隊列管理器字符集 編碼
        MQEnvironment.CCSID = 1381;

        // 設置應用名稱,方便服務器MQ 查看應用鏈接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 設置應用名稱,方便服務器MQ 查看應用鏈接
        MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES);

        // 建立實例,鏈接隊列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 以可寫的方式訪問隊列管理器已定義的隊列QUEUE1,固然也能夠建立隊列
        MQQueue putQueue = queueManager.accessQueue("QUEUE_RECV", CMQC.MQOO_OUTPUT);

        // 新建併發送消息給隊列
        MQMessage myMessage = new MQMessage();
        myMessage.expiry = -1;    // 設置消息用不過時
        String name = "MePlusPlus's 博客";
        myMessage.writeUTF(name);

        // 使用默認的消息選項
        MQPutMessageOptions pmo = new MQPutMessageOptions();
        // 發送消息
        putQueue.put(myMessage, pmo);
        putQueue.close();

        // 斷開鏈接
        queueManager.disconnect();
    }

    @SuppressWarnings("unchecked")
    static void get() throws MQException, IOException {

        // 配置MQ服務器鏈接參數
        MQEnvironment.hostname = "127.0.0.1";
        MQEnvironment.port = 1414;
        MQEnvironment.channel = "QM_ACK";
        // MQEnvironment.userID = "";
        // MQEnvironment.password = "";
        // 設置隊列管理器字符集 編碼
        MQEnvironment.CCSID = 1381;

        // 設置應用名稱,方便服務器MQ 查看應用鏈接
        MQEnvironment.properties.put(MQConstants.APPNAME_PROPERTY, "MQ Test By Java");

        // 設置應用名稱,方便服務器MQ 查看應用鏈接
        MQEnvironment.properties.put(CMQC.TRANSPORT_PROPERTY, CMQC.TRANSPORT_MQSERIES_BINDINGS);

        // 建立實例,鏈接隊列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",MQEnvironment.properties);

        // 打開隊列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;

        // 以可讀的方式訪問隊列管理器已定義的隊列QUEUE1
        // MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV",CMQC.MQOO_INPUT_AS_Q_DEF);
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步點控制下獲取消息.
        //gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 若是在隊列上沒有消息則等待.
        //gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 若是隊列管理器停頓則失敗.
        //gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;
        // Sets the time limit for the wait.
        // 設置等待的時間限制.
        gmo.waitInterval = 3000;

        // 從隊列讀取消息
        MQMessage theMessage = new MQMessage();
        getQueue.get(theMessage, gmo);

        String name = theMessage.readUTF();

        System.out.println(name);
        getQueue.close();

        // 斷開鏈接
        queueManager.disconnect();
    }
    
    static void getDepth() throws MQException, IOException {

        Properties props  = new Properties();
        props .put("hostname", "127.0.0.1");
        props .put("port", 1414); //端口號
        props .put("channel", "QM_ACK"); //服務器鏈接通道
        props .put("CCSID", 1381);
        props .put("transport", "MQSeries");
        
        // 建立實例,鏈接隊列管理器
        MQQueueManager queueManager = new MQQueueManager("QM",props );

        // 打開隊列.
        int openOptions = CMQC.MQOO_INPUT_AS_Q_DEF|CMQC.MQOO_OUTPUT|CMQC.MQOO_INQUIRE;
        // 以可讀的方式訪問隊列管理器已定義的隊列QUEUE1
        MQQueue getQueue = queueManager.accessQueue("QUEUE_RECV", openOptions, null, null, null);

        MQGetMessageOptions gmo = new MQGetMessageOptions();
        // Get messages under sync point control.
        // 在同步點控制下獲取消息.
        // gmo.options = gmo.options + CMQC.MQGMO_SYNCPOINT;
        // Wait if no messages on the Queue.
        // 若是在隊列上沒有消息則等待.
        // gmo.options = gmo.options + CMQC.MQGMO_WAIT;
        // Fail if QeueManager Quiescing.
        // 若是隊列管理器停頓則失敗.
        // gmo.options = gmo.options + CMQC.MQGMO_FAIL_IF_QUIESCING;

        // Sets the time limit for the wait.
        // 設置等待的時間限制.
        gmo.waitInterval = 3000;
        
        int depth = getQueue.getCurrentDepth();
        System.out.println("該隊列當前的深度爲:"+depth);
        System.out.println("===========================");
        while(depth-->0)
        {
            MQMessage msg = new MQMessage();// 要讀的隊列的消息
            getQueue.get(msg, gmo);
            System.out.println("消息的大小爲:"+msg.getDataLength());
            System.out.println("消息的內容:\n"+msg.readUTF());
            System.out.println("---------------------------");
        }
        System.out.println("Finish!!!");

        getQueue.close();

        // 斷開鏈接
        queueManager.disconnect();
        
    }

}

 JAVA從MQ讀取消息的時候報錯及解決web

  JAVA經過writeUTF將消息寫入到MQ,再經過JAVA採用MQMessage讀消息的方法readUTF()去讀取的時候,就不會報錯,能夠正常讀出來。可是若是採用在MQ資源管理器中插入測試消息或者是經過另一臺MQ服務器往當前MQ服務器經過遠程隊例寫消息過來,經過JAVA讀取出會錯.服務器

MQMessage mqmsg = new MQMessage();// 要讀的隊列的消息
getQueue.get(mqmsg, gmo);

System.out.println("消息的大小爲:" + mqmsg.getDataLength());

byte[] rawData = new byte[mqmsg.getMessageLength()];     // 先轉byte
mqmsg.readFully(rawData);                                 // 讀出全部數據
String msg = new String(rawData);                         // byte轉string
System.out.println("消息的內容:\n"+msg);
相關文章
相關標籤/搜索