MQ java 基礎編程(一)

本文轉自:http://www.blogjava.net/i369/articles/88035.htmlhtml

 

編寫人:鄔文俊java

編寫時間 2006-2-16 node

聯繫郵件 : wenjunwu430@gmail.com 數組

前言

經過 2 個多星期對 MQ 學習,在 partner 丁 & partner 武 的幫助下完成了該文檔。該文檔提供一個簡單的例子,經過對該例子的講解,你將知道: 服務器

1.         java 寫客戶端從 MQ Server 收發消息。 session

2.         MQ 做爲 Websphere Application Server 的 JMS 資源提供者。 app

3.         JMS message 映射轉化爲  MQ message ide

文檔中的知識所有從參考資料和 IBM 提供的文檔中得到。 I recommend u to read the documents if u want to know more about the MQ. post

參考資料

1.         Using java 》( some place name it 《 base java 》) -----the very important document offered by IBM, every java programmer should read it! 學習

2.         《讓 WebSphere MQ 成爲部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》

3.         Websphere MQ 入門教程 (a document written by IBM engineer)

4.         mqseries_class_for_java

5.         IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》 ------- 《 using java 》 mapping message 部分的翻譯。

6.         MQ--IBM MQSeries 使用指南

7.         WebSphere Application Server V5 and WebSphere MQ Family Integration. PDF

8.         WebSphere MQ Application Programming Guide. PDF

9.         IBM MQSeries 的觸發機制

10.     WebSphere MQ 成爲部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序

例子說明

例子包括 3 個部分:發送客戶端、 MDB 、接收客戶端。

客戶端 A 把文件發送到服務器,服務器將該文件轉發給客戶端 B 。客戶端A經過向 MQ 客戶機直接發送消息。 MQ 隊列會把客戶端A發送的 MQ Message 轉換爲 JMS Message 。 MDB 接收到轉換後的 JMS 消息後,保存文件在服務器,而後把文件轉發到發送隊列( JMS 隊列)。發送隊列由 MQ Server 提供,因此即爲發送到了 MQ 隊列。 MQ 隊列把 JMS Message 轉換爲 MQ Message 。客戶端 B 從 MQ 隊列中接收轉換後的消息,從消息中讀取文件,保存在硬盤。

 

            MQMESSAGE             JMS MESSAGE

Client A------------------------->mq queue  ------------------->MDB

Client B<------------------------ mq queue  <-------------------MDB

 

1.       配置 MQ Server

這裏使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安裝在同一臺機器上( 2 者可使用綁定方式通訊)。

要配置的項目:

1.         隊列管理器 QMGR

2.         偵聽端口 4001

3.         本地隊列 EXAMPLE.QUEUE

4.         本地隊列 EXAMPLE.SENDQUEUE

5.         通道 EXAMPLE.CHANNEL

 

打開 WebSphere MQ 資源管理器。展開隊列管理器節點,右鍵,新建隊列管理器。取名字爲 QMGR ,設置偵聽端口 4001

在建好的隊列管理器 QMGR 下面新建 2 個本地隊列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE

展開高級節點,新建服務器鏈接通道 EXAMPLE.CHANNEL

Note :不要搞錯隊列和通道的類型。

2.       驗證 MQ 配置

打開 WebSphere MQ 服務。能夠查看服務是否啓動、服務監聽端口。

3.       配置 WAS JMS

具體操做參考《讓 WebSphere MQ 成爲部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》該文章能夠在 IBM WebSphere 開發者技術期刊 中找到。

要配置的項目:

1.         WebSphere MQ 鏈接工廠

JNDI name : jms/JMSExampleConnectionFactory

2.         WebSphere MQ 隊列目標

JNDI name : jms/JMSExampleQueue ;基本隊列名: EXAMPLE.QUEUE ;目標客戶機: JMS 。目標客戶機決定了 MQ 隊列接收方的消息格式。由於是用 MDB 接收消息,因此設置爲 JMS 。另外一個隊列是由 MQ 客戶端接收消息,因此另外一個隊列的目標客戶機是 MQ 。若是配置錯誤, MQ 隊列轉換消息的格式將不是你所想要的。具體參考《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》

3.         WebSphere MQ 隊列目標

JNDI name : jms/JMSExampleSendQueue ;

基本隊列名: EXAMPLE.SENDQUEUE ;目標客戶機: MQ

4.       配置 MDB

WAS 上配置 偵聽器端口

名稱: JMSExampleQueuePort

鏈接工廠 JNDI 名      jms/JMSExampleConnectionFactory ;

目標 JNDI 名: jms/JMSExampleQueue

Message Driven Beans 用於偵聽消息的偵聽器端口。每一個端口指定 MDB 將偵聽的(依據該端口部署的) JMS 鏈接工廠和 JMS 目標。

 

MDB 部署描述符中配置

鏈接工廠 JNDI 名      jms/JMSExampleConnectionFactory ;

目標 JNDI 名: jms/JMSExampleQueue

監聽端口名稱: JMSExampleQueuePort (監聽端口名稱也能夠在管理控制檯中修改)

5.       代碼

客戶端 A (發送方)

MqPut.java

 

package cn.edu.itec.mqclient;

 

import java.io.File;

 

import com.ibm.mq.MQC;

import com.ibm.mq.MQEnvironment;

import com.ibm.mq.MQException;

import com.ibm.mq.MQMessage;

import com.ibm.mq.MQPutMessageOptions;

import com.ibm.mq.MQQueueManager;

 

public class MQPut {

       private String HOST_URL = "192.168.1.116";

 

       private String MQ_CHANNEL = "EXAMPLE.CHANNEL";

 

       private String MQ_MANAGER = "QMGR";

 

       private String MQ_QUEUE = "EXAMPLE.QUEUE";

 

       private int MQ_PORT = 4001;

 

       public static void main(String args[]) {

              new MQPut().SendFile("f:/JMSExampleEJB.jar");

       }

 

       public void SendFile(String sFilePath) {

              try {

 

                     /* 設置 MQEnvironment 屬性以便客戶機鏈接 */

                     MQEnvironment.hostname = HOST_URL;

                     MQEnvironment.channel = MQ_CHANNEL;

                     MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES);

                     MQEnvironment.CCSID = 1381;

                     MQEnvironment.port = MQ_PORT;

 

                     /* 鏈接到隊列管理器 */

                     MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);

                     System.out.println("queue manager is connected!");

 

                     /* 設置打開選項以便打開用於輸出的隊列,若是隊列管理器正在中止,咱們也已設置了選項去應對不成功狀況。 */

                     int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;

 

                     /* 打開隊列 */

                     com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);

 

                     /* 設置放置消息選項咱們將使用默認設置 */

                     MQPutMessageOptions pmo = new MQPutMessageOptions();

 

                     /* 建立消息, MQMessage 類包含實際消息數據的數據緩衝區,和描述消息的全部 MQMD 參數 */

 

                     /* 建立消息緩衝區 */

                     MQMessage outMsg = new MQMessage();

 

                     /* set the properties of the message fot the selector */

                     outMsg.correlationId = "clinet_B_receive".getBytes();

                     outMsg.messageId = "1Aa".getBytes();

                    

                     /* write msg */

                     MsgWriter.readFile(outMsg, new File(sFilePath));

 

                     /* put message with default options */

                     queue.put(outMsg, new MQPutMessageOptions());

                     System.out.println("send file is success!");

                     /* release resource */

                     queue.close();

                     qMgr.disconnect();

 

              } catch (MQException ex) {

                     //System.out.println("fft!");

                     System.out.println("An MQ Error Occurred: Completion Code is :\t"

                                   + ex.completionCode + "\n\n The Reason Code is :\t"

                                   + ex.reasonCode);

                     ex.printStackTrace();

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

 

       private void readFileToMessage(String FilePath) {

 

       }

}

 

JMS message 和 MQ message 有幾個字段是相同的,這些字段的值將會在轉換中保留。比較方便的是使用 CorrelationID 這個字段。經過設置這個字段,達到選擇性的接收特定消息的功能。其它字段沒有徹底搞清楚,有的數據類型須要轉換,例如 MessageID (對應於 JMSMessageID )。 MQ 消息選擇和 JMS 不一樣,後者採用 selector ,前者經過設置接收消息的屬性完成。例如設置 CorrelationID 爲特定值。

客戶端 B

MQGet.java

 

package cn.edu.itec.mqclient;

 

import java.io.FileOutputStream;

import java.io.IOException;

import java.util.Hashtable;

 

import com.ibm.mq.MQC;

import com.ibm.mq.MQEnvironment;

import com.ibm.mq.MQException;

import com.ibm.mq.MQGetMessageOptions;

import com.ibm.mq.MQMessage;

import com.ibm.mq.MQQueueManager;

 

/**

 * @author Administrator

 *

 * TODO To change the template for this generated type comment go to Window -

 * Preferences - Java - Code Style - Code Templates

 */

public class MQGet {

       private static String HOST_URL = "192.168.1.116";

 

       private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";

 

       private static String MQ_MANAGER = "QMGR";

 

       private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

 

       private static int MQ_PORT = 4001;

 

       public static void main(String[] args) {

              MQGet.getMessage();

       }

 

       public static void getMessage() {

              try {

                     /* 設置 MQEnvironment 屬性以便客戶機鏈接 */

                     MQEnvironment.hostname = HOST_URL;

                     MQEnvironment.channel = MQ_CHANNEL;

                     MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,

                                   MQC.TRANSPORT_MQSERIES);

                     MQEnvironment.CCSID = 1381;

                     MQEnvironment.port = MQ_PORT;

 

                     /* 鏈接到隊列管理器 */

                     MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER);

                     System.out.println("queue manager is connected!");

 

                     /*

                      * 設置打開選項以便打開用於輸出的隊列,若是隊列管理器中止,咱們也 已設置了選項去應對不成功狀況

                      */

                     int openOptions = MQC.MQOO_INPUT_SHARED

                                   | MQC.MQOO_FAIL_IF_QUIESCING;

 

                     /* 打開隊列 */

                     com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions);

                     System.out.println(" 隊列鏈接成功 ");

                     /* 設置放置消息選項 */

                     MQGetMessageOptions gmo = new MQGetMessageOptions();

 

                     /* 在同步點控制下獲取消息 */

                     gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;

 

                     /* 若是在隊列上沒有消息則等待 */

                     gmo.options = gmo.options + MQC.MQGMO_WAIT;

 

                     /* 若是隊列管理器停頓則失敗 */

                     gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;

 

                     /* 設置等待的時間限制 */

                     gmo.waitInterval = MQC.MQWI_UNLIMITED;

                     /* create the message buffer store */

                     MQMessage inMessage = new MQMessage();

                     /* set the selector */

                     inMessage.correlationId = "clinet_B_receive".getBytes();

                     /* get the message */

                     queue.get(inMessage, gmo);

                     System.out.println("get message success");

                    

                     /* 讀出消息對象 */

                     Hashtable messageObject = (Hashtable) inMessage.readObject();

                     System.out.println(messageObject);

                     /* 讀出消息內容 */

                     byte[] content = (byte[]) messageObject.get("content");

                     /* save file */

                     FileOutputStream output = new FileOutputStream(

                                   "f:/exampleReceive.jar");

                     output.write(content);

                     output.close();

 

                     System.out.println(messageObject.get("FileName"));

                     /* 提交事務 , 至關於確認消息已經接收,服務器會刪除該消息 */

                     qMgr.commit();

 

              } catch (MQException e) {

                     e.printStackTrace();

              } catch (IOException e) {

                     // TODO Auto-generated catch block

                     e.printStackTrace();

              } catch (ClassNotFoundException e) {

                     // TODO Auto-generated catch block

                     e.printStackTrace();

              }

       }

}

 

MDB

MQMDBBeanBean.java MDB 文件

 

package ejbs;

 

import javax.jms.ObjectMessage;

import javax.jms.BytesMessage;

import javax.jms.StreamMessage;

import javax.jms.TextMessage;

import javax.jms.JMSException;

import ehub.ihub.exchangeManager.*;

import java.util.Hashtable;

import java.io.ByteArrayInputStream;

import java.io.FileOutputStream;

import java.io.File;

import java.io.ObjectInputStream;

 

/**

 * Bean implementation class for Enterprise Bean: MQMDBBean

 */

public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,

              javax.jms.MessageListener {

       private javax.ejb.MessageDrivenContext fMessageDrivenCtx;

 

       /**

        * getMessageDrivenContext

        */

       public javax.ejb.MessageDrivenContext getMessageDrivenContext() {

              return fMessageDrivenCtx;

       }

 

       /**

        * setMessageDrivenContext

        */

       public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) {

              fMessageDrivenCtx = ctx;

       }

 

       /**

        * ejbCreate

        */

       public void ejbCreate() {

       }

 

       /**

        * onMessage

        */

       public void onMessage(javax.jms.Message msg) {

              try {

                     System.out.println(msg.toString());

                     if (msg instanceof TextMessage) {

                            System.out.println("TextMessage");

                     } else if (msg instanceof ObjectMessage) {

                            System.out.println("ObjectMessage");

                     } else if (msg instanceof StreamMessage) {

                            System.out.println("StreamMessage");

                     } else if (msg instanceof BytesMessage) {

                            System.out.println("BytesMessage");

                            BytesMessage bytesMessage = (BytesMessage) msg;

                            String sCorrelationID = new String(bytesMessage

                                          .getJMSCorrelationIDAsBytes());

                            String sMessageID = bytesMessage.getJMSMessageID();

                            long size = bytesMessage.getBodyLength();

                            System.out.println("size=" + size + "/n CorrelationID="

                                          + sCorrelationID + "/n MessageID=" + sMessageID);

                            /*read the message and save the file*/

                            ReadMessage.readMessage(bytesMessage);

                            System.out.println("read message success");

                            /*send the message to the client */

                            SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar"));

                            System.out.println("send file success");

                           

                     } else {

 

                            System.out.println("no message");

                     }

 

              } catch (Exception e) {

                     System.out.println("onmessage 執行錯誤,回滾! ");

                     e.printStackTrace(System.err);

                     fMessageDrivenCtx.setRollbackOnly();

              }

       }

 

       private void getProperties(byte[] p) {

 

       }

 

       /**

        * ejbRemove

        */

       public void ejbRemove() {

       }

}

 

ReadMessage.java

 

/*

 * Created on 2006-2-15

 *

 * TODO To change the template for this generated file go to

 * Window - Preferences - Java - Code Style - Code Templates

 */

package ehub.ihub.exchangeManager;

 

import java.io.ByteArrayInputStream;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.ObjectInputStream;

import java.util.Hashtable;

 

import javax.jms.BytesMessage;

import javax.jms.JMSException;

 

/**

 * @author Administrator

 *

 * 

 */

public class ReadMessage {

       /**

        * read message including property and body

        *

        * @param Message

        * @throws JMSException

        * @throws IOException

        * @throws ClassNotFoundException

        */

       public static void readMessage(BytesMessage Message) {

              try {

                     long bodySize = Message.getBodyLength();

 

                     byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))];

                     /* 消息內容讀到字節數組中 */

                     Message.readBytes(buf);

                     ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(

                                   buf);

                     /* 從字節流讀出消息對象 */

                     ObjectInputStream objectInputStream = new ObjectInputStream(

                                   byteArrayInputStream);

                     Hashtable messageObject = (Hashtable) objectInputStream

                                   .readObject();

                     /* 解析消息 */

                     byte[] contentBuf = (byte[]) messageObject.get("content");

                     /* 把文件保存 */

                     FileOutputStream fileWriter = new FileOutputStream(

                                   "c:/receivedExample.jar");

                     fileWriter.write(contentBuf);

                     fileWriter.close();

              } catch (JMSException e) {

                     e.printStackTrace();

              } catch (IOException e) {

                     // TODO Auto-generated catch block

                     e.printStackTrace();

              } catch (ClassNotFoundException e) {

                     // TODO Auto-generated catch block

                     e.printStackTrace();

              }

       }

}

 

SendMessage.java

 

/*

 * Created on 2006-2-16

 *

 * TODO To change the template for this generated file go to

 * Window - Preferences - Java - Code Style - Code Templates

 */

package ehub.ihub.exchangeManager;

 

import java.io.File;

import java.io.FileInputStream;

import java.io.IOException;

import java.util.Hashtable;

 

import javax.jms.BytesMessage;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.ObjectMessage;

import javax.jms.Queue;

import javax.jms.Session;

import javax.naming.Context;

import javax.naming.InitialContext;

import javax.naming.NamingException;

 

/**

 * @author Administrator

 *

 * TODO To change the template for this generated type comment go to Window -

 * Preferences - Java - Code Style - Code Templates

 */

public class SendMessage {

       private static String MQ_CHANNEL = "EXAMPLE.CHANNEL";

 

       private static String MQ_MANAGER = "QMGR";

 

       private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE";

 

       private static int MQ_PORT = 4001;

 

       private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory";

      

       private static String QUEUE_NAME="jms/JMSExampleSendQueue";

 

       public static void sendFileToReceiveQueue(File file) {

             

              try {

                     Context initContext = new InitialContext();

                     ConnectionFactory qconFactory = (ConnectionFactory) initContext

                                   .lookup(JMS_CONNECTIONFACTORY);

                     Connection qcon = qconFactory.createConnection();

                     Session session = qcon.createSession(false,

                                   Session.AUTO_ACKNOWLEDGE);

                     Queue queue = (Queue) initContext.lookup(QUEUE_NAME);

                    

                     MessageProducer producer = session.createProducer(queue);

                     ObjectMessage outMessage=session.createObjectMessage();

             

                     /* write the file information into the message */

                     Hashtable fileInfo = new Hashtable();

                     fileInfo.put("FileName", file.getName());

                     fileInfo.put("FileSize", Long.toString(file.length()));

 

                     /* write the file content into the message */

                     FileInputStream fos = new FileInputStream(file);

                     byte[] buf;

                     int size = (int) file.length();

                     buf = new byte[size];

                     int num = fos.read(buf);

                     fos.close();

                    

                     /*add the file byte stream to the object*/

                     fileInfo.put("content", buf);

                    

                     outMessage.setObject(fileInfo);

                     outMessage.getObject();

                     outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());

//                   qcon.start();

                    

 

                     producer.send(outMessage);

                     producer.close();

                     session.close();

                     qcon.close();

              } catch (NamingException e) {

                     System.out.println(" 得到鏈接失敗 ,jndi 查找失敗 ");

                     e.printStackTrace();

              } catch (JMSException e) {

                     System.out.println(" 發送文件異常 ");

                     // TODO Auto-generated catch block

                     e.printStackTrace();

              } catch (IOException e) {

                     // TODO Auto-generated catch block

                     System.out.println(" 發送文件過程當中 io 操做失敗 ");

                     e.printStackTrace();

              }

       }

}

 



Trackback: http://tb.blog.csdn.net/TrackBack.aspx?PostId=600470

相關文章
相關標籤/搜索