MQ java 基礎編程
編寫人:鄔文俊
編寫時間 : 2006-2-16
聯繫郵件 : wenjunwu430@gmail.com
前言java
經過 2 個多星期對 MQ 學習,在 partner 丁 & partner 武 的幫助下完成了該文檔。該文檔提供一個簡單的例子,經過對該例子的講解,你將知道:
1. 用 java 寫客戶端從 MQ Server 收發消息。
2. MQ 做爲 Websphere Application Server 的 JMS 資源提供者。
3. JMS message 映射轉化爲 MQ message
文檔中的知識所有從參考資料和 IBM 提供的文檔中得到。 I recommend u to read the documents if u want to know more about the MQ.
參考資料編程
例子包括 3 個部分:發送客戶端、 MDB 、接收客戶端。
客戶端 A 把文件發送到服務器,服務器將該文件轉發給客戶端 B 。客戶端A經過向 MQ 客戶機直接發送消息。 MQ 隊列會把客戶端A發送的 MQ Message 轉換爲 JMS Message 。 MDB 接收到轉換後的 JMS 消息後,保存文件在服務器,而後把文件轉發到發送隊列( JMS 隊列)。發送隊列由 MQ Server 提供,因此即爲發送到了 MQ 隊列。 MQ 隊列把 JMS Message 轉換爲 MQ Message 。客戶端 B 從 MQ 隊列中接收轉換後的消息,從消息中讀取文件,保存在硬盤。數組
MQMESSAGE JMS MESSAGE
Client A————————->mq queue ——————->MDB
Client B<———————— mq queue <——————-MDB服務器
這裏使用的是 MQ Server 5.2 。 MQ Server 和 WebSphere Application Server 安裝在同一臺機器上( 2 者可使用綁定方式通訊)。
要配置的項目:
1. 隊列管理器 QMGR
2. 偵聽端口 4001
3. 本地隊列 EXAMPLE.QUEUE
4. 本地隊列 EXAMPLE.SENDQUEUE
5. 通道 EXAMPLE.CHANNELmarkdown
打開 WebSphere MQ 資源管理器。展開隊列管理器節點,右鍵,新建隊列管理器。取名字爲 QMGR ,設置偵聽端口 4001 。
在建好的隊列管理器 QMGR 下面新建 2 個本地隊列: EXAMPLE.QUEUE , EXAMPLE.SENDQUEUE 。
展開高級節點,新建服務器鏈接通道 EXAMPLE.CHANNEL 。
Note :不要搞錯隊列和通道的類型。
2. 驗證 MQ 配置session
打開 WebSphere MQ 服務。能夠查看服務是否啓動、服務監聽端口。
3. 配置 WAS JMSapp
具體操做參考《讓 WebSphere MQ 成爲部署在 WebSphere Application Server 中的應用程序的 JMS 提供程序》該文章能夠在 IBM WebSphere 開發者技術期刊 中找到。
要配置的項目:
1. WebSphere MQ 鏈接工廠
JNDI name : jms/JMSExampleConnectionFactory
2. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleQueue ;基本隊列名: EXAMPLE.QUEUE ;目標客戶機: JMS 。目標客戶機決定了 MQ 隊列接收方的消息格式。由於是用 MDB 接收消息,因此設置爲 JMS 。另外一個隊列是由 MQ 客戶端接收消息,因此另外一個隊列的目標客戶機是 MQ 。若是配置錯誤, MQ 隊列轉換消息的格式將不是你所想要的。具體參考《 IBM - JMS 應用和使用 WebSphere MQ MQI 接口的應用如何進行信息頭的交換(二)數據映射》
3. WebSphere MQ 隊列目標
JNDI name : jms/JMSExampleSendQueue ;
基本隊列名: EXAMPLE.SENDQUEUE ;目標客戶機: MQ 。
4. 配置 MDBide
在 WAS 上配置 偵聽器端口
名稱: JMSExampleQueuePort ;
鏈接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue 。
Message Driven Beans 用於偵聽消息的偵聽器端口。每一個端口指定 MDB 將偵聽的(依據該端口部署的) JMS 鏈接工廠和 JMS 目標。學習
MDB 部署描述符中配置
鏈接工廠 JNDI 名 jms/JMSExampleConnectionFactory ;
目標 JNDI 名: jms/JMSExampleQueue ;
監聽端口名稱: JMSExampleQueuePort (監聽端口名稱也能夠在管理控制檯中修改)
5. 代碼ui
客戶端 A (發送方)
MqPut.java
package cn.edu.itec.mqclient;
import java.io.File;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
public class MQPut {
private String HOST_URL = 「192.168.1.116」;
private String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private String MQ_MANAGER = "QMGR"; private String MQ_QUEUE = "EXAMPLE.QUEUE"; private int MQ_PORT = 4001; public static void main(String args[]) { new MQPut().SendFile("f:/JMSExampleEJB.jar"); } public void SendFile(String sFilePath) { try { /* 設置 MQEnvironment 屬性以便客戶機鏈接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 鏈接到隊列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* 設置打開選項以便打開用於輸出的隊列,若是隊列管理器正在中止,咱們也已設置了選項去應對不成功狀況。 */ int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; /* 打開隊列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); /* 設置放置消息選項咱們將使用默認設置 */ MQPutMessageOptions pmo = new MQPutMessageOptions(); /* 建立消息, MQMessage 類包含實際消息數據的數據緩衝區,和描述消息的全部 MQMD 參數 */ /* 建立消息緩衝區 */ MQMessage outMsg = new MQMessage(); /* set the properties of the message fot the selector */ outMsg.correlationId = "clinet_B_receive".getBytes(); outMsg.messageId = "1Aa".getBytes(); /* write msg */ MsgWriter.readFile(outMsg, new File(sFilePath)); /* put message with default options */ queue.put(outMsg, new MQPutMessageOptions()); System.out.println("send file is success!"); /* release resource */ queue.close(); qMgr.disconnect(); } catch (MQException ex) { //System.out.println("fft!"); System.out.println("An MQ Error Occurred: Completion Code is :\t" + ex.completionCode + "\n\n The Reason Code is :\t" + ex.reasonCode); ex.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } private void readFileToMessage(String FilePath) { }
}
JMS message 和 MQ message 有幾個字段是相同的,這些字段的值將會在轉換中保留。比較方便的是使用 CorrelationID 這個字段。經過設置這個字段,達到選擇性的接收特定消息的功能。其它字段沒有徹底搞清楚,有的數據類型須要轉換,例如 MessageID (對應於 JMSMessageID )。 MQ 消息選擇和 JMS 不一樣,後者採用 selector ,前者經過設置接收消息的屬性完成。例如設置 CorrelationID 爲特定值。
客戶端 B
MQGet.java
package cn.edu.itec.mqclient;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Hashtable;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQQueueManager;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class MQGet {
private static String HOST_URL = 「192.168.1.116」;
private static String MQ_CHANNEL = "EXAMPLE.CHANNEL"; private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; public static void main(String[] args) { MQGet.getMessage(); } public static void getMessage() { try { /* 設置 MQEnvironment 屬性以便客戶機鏈接 */ MQEnvironment.hostname = HOST_URL; MQEnvironment.channel = MQ_CHANNEL; MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY, MQC.TRANSPORT_MQSERIES); MQEnvironment.CCSID = 1381; MQEnvironment.port = MQ_PORT; /* 鏈接到隊列管理器 */ MQQueueManager qMgr = new MQQueueManager(MQ_MANAGER); System.out.println("queue manager is connected!"); /* * 設置打開選項以便打開用於輸出的隊列,若是隊列管理器中止,咱們也 已設置了選項去應對不成功狀況 */ int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING; /* 打開隊列 */ com.ibm.mq.MQQueue queue = qMgr.accessQueue(MQ_QUEUE, openOptions); System.out.println(" 隊列鏈接成功 "); /* 設置放置消息選項 */ MQGetMessageOptions gmo = new MQGetMessageOptions(); /* 在同步點控制下獲取消息 */ gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT; /* 若是在隊列上沒有消息則等待 */ gmo.options = gmo.options + MQC.MQGMO_WAIT; /* 若是隊列管理器停頓則失敗 */ gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING; /* 設置等待的時間限制 */ gmo.waitInterval = MQC.MQWI_UNLIMITED; /* create the message buffer store */ MQMessage inMessage = new MQMessage(); /* set the selector */ inMessage.correlationId = "clinet_B_receive".getBytes(); /* get the message */ queue.get(inMessage, gmo); System.out.println("get message success"); /* 讀出消息對象 */ Hashtable messageObject = (Hashtable) inMessage.readObject(); System.out.println(messageObject); /* 讀出消息內容 */ byte[] content = (byte[]) messageObject.get("content"); /* save file */ FileOutputStream output = new FileOutputStream( "f:/exampleReceive.jar"); output.write(content); output.close(); System.out.println(messageObject.get("FileName")); /* 提交事務 , 至關於確認消息已經接收,服務器會刪除該消息 */ qMgr.commit(); } catch (MQException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
MDB
MQMDBBeanBean.java MDB 文件
package ejbs;
import javax.jms.ObjectMessage;
import javax.jms.BytesMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import ehub.ihub.exchangeManager.*;
import java.util.Hashtable;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
/**
* Bean implementation class for Enterprise Bean: MQMDBBean
*/
public class MQMDBBeanBean implements javax.ejb.MessageDrivenBean,
javax.jms.MessageListener {
private javax.ejb.MessageDrivenContext fMessageDrivenCtx;
/** * getMessageDrivenContext */ public javax.ejb.MessageDrivenContext getMessageDrivenContext() { return fMessageDrivenCtx; } /** * setMessageDrivenContext */ public void setMessageDrivenContext(javax.ejb.MessageDrivenContext ctx) { fMessageDrivenCtx = ctx; } /** * ejbCreate */ public void ejbCreate() { } /** * onMessage */ public void onMessage(javax.jms.Message msg) { try { System.out.println(msg.toString()); if (msg instanceof TextMessage) { System.out.println("TextMessage"); } else if (msg instanceof ObjectMessage) { System.out.println("ObjectMessage"); } else if (msg instanceof StreamMessage) { System.out.println("StreamMessage"); } else if (msg instanceof BytesMessage) { System.out.println("BytesMessage"); BytesMessage bytesMessage = (BytesMessage) msg; String sCorrelationID = new String(bytesMessage .getJMSCorrelationIDAsBytes()); String sMessageID = bytesMessage.getJMSMessageID(); long size = bytesMessage.getBodyLength(); System.out.println("size=" + size + "/n CorrelationID=" + sCorrelationID + "/n MessageID=" + sMessageID); /*read the message and save the file*/ ReadMessage.readMessage(bytesMessage); System.out.println("read message success"); /*send the message to the client */ SendMessage.sendFileToReceiveQueue(new File("c:/receivedExample.jar")); System.out.println("send file success"); } else { System.out.println("no message"); } } catch (Exception e) { System.out.println("onmessage 執行錯誤,回滾! "); e.printStackTrace(System.err); fMessageDrivenCtx.setRollbackOnly(); } } private void getProperties(byte[] p) { } /** * ejbRemove */ public void ejbRemove() { }
}
ReadMessage.java
/*
* Created on 2006-2-15
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
/**
* @author Administrator
*
*
*/
public class ReadMessage {
/**
* read message including property and body
*
* @param Message
* @throws JMSException
* @throws IOException
* @throws ClassNotFoundException
*/
public static void readMessage(BytesMessage Message) {
try {
long bodySize = Message.getBodyLength();
byte[] buf = new byte[Integer.parseInt(String.valueOf(bodySize))]; /* 消息內容讀到字節數組中 */ Message.readBytes(buf); ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( buf); /* 從字節流讀出消息對象 */ ObjectInputStream objectInputStream = new ObjectInputStream( byteArrayInputStream); Hashtable messageObject = (Hashtable) objectInputStream .readObject(); /* 解析消息 */ byte[] contentBuf = (byte[]) messageObject.get("content"); /* 把文件保存 */ FileOutputStream fileWriter = new FileOutputStream( "c:/receivedExample.jar"); fileWriter.write(contentBuf); fileWriter.close(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
}
SendMessage.java
/*
* Created on 2006-2-16
*
* TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates
*/
package ehub.ihub.exchangeManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Hashtable;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* @author Administrator
*
* TODO To change the template for this generated type comment go to Window -
* Preferences - Java - Code Style - Code Templates
*/
public class SendMessage {
private static String MQ_CHANNEL = 「EXAMPLE.CHANNEL」;
private static String MQ_MANAGER = "QMGR"; private static String MQ_QUEUE = "EXAMPLE.SENDQUEUE"; private static int MQ_PORT = 4001; private static String JMS_CONNECTIONFACTORY = "jms/JMSExampleConnectionFactory"; private static String QUEUE_NAME="jms/JMSExampleSendQueue"; public static void sendFileToReceiveQueue(File file) { try { Context initContext = new InitialContext(); ConnectionFactory qconFactory = (ConnectionFactory) initContext .lookup(JMS_CONNECTIONFACTORY); Connection qcon = qconFactory.createConnection(); Session session = qcon.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) initContext.lookup(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); ObjectMessage outMessage=session.createObjectMessage(); /* write the file information into the message */ Hashtable fileInfo = new Hashtable(); fileInfo.put("FileName", file.getName()); fileInfo.put("FileSize", Long.toString(file.length())); /* write the file content into the message */ FileInputStream fos = new FileInputStream(file); byte[] buf; int size = (int) file.length(); buf = new byte[size]; int num = fos.read(buf); fos.close(); /*add the file byte stream to the object*/ fileInfo.put("content", buf); outMessage.setObject(fileInfo); outMessage.getObject(); outMessage.setJMSCorrelationIDAsBytes((new String("clinet_B_receive")).getBytes());
// qcon.start();
producer.send(outMessage); producer.close(); session.close(); qcon.close(); } catch (NamingException e) { System.out.println(" 得到鏈接失敗 ,jndi 查找失敗 "); e.printStackTrace(); } catch (JMSException e) { System.out.println(" 發送文件異常 "); // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block System.out.println(" 發送文件過程當中 io 操做失敗 "); e.printStackTrace(); } }
}