JavaShuo
欄目
標籤
一段同步接收和發送MQ消息的代碼
時間 2019-11-08
標籤
一段
同步
接收
發送
消息
代碼
简体版
原文
原文鏈接
JAVA代碼:
Java代碼
package
com.sdb.payment.core.mq;
import
org.apache.log4j.Logger;
import
com.ibm.mq.MQC;
import
com.ibm.mq.MQEnvironment;
import
com.ibm.mq.MQException;
import
com.ibm.mq.MQGetMessageOptions;
import
com.ibm.mq.MQMessage;
import
com.ibm.mq.MQPutMessageOptions;
import
com.ibm.mq.MQQueue;
import
com.ibm.mq.MQQueueManager;
public
class
MessageQueueService {
private
static
Logger logger = Logger.getLogger(MessageQueueService.
class
);
private
String hostname =
"192.168.0.117"
;
private
String channel =
"CHL.SVRCONN"
;
private
String queueManager =
"QM_SERVER"
;
private
String sendQueue =
"OMP.QRMT"
;
private
String recvQueue =
"OMP.QLCA"
;
private
int
port =
24100
;
private
int
ccsid =
1381
;
private
int
failedCount =
5
;
private
int
intervalTime =
1000
;
public
MessageQueueService() {
MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
}
public
String send(String sendMsg)
throws
Exception {
MQQueueManager qManager =
new
MQQueueManager(queueManager);
// send message
int
openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);
MQPutMessageOptions pmo =
new
MQPutMessageOptions();
MQMessage send =
new
MQMessage();
send.write(sendMsg.getBytes());
System.out.println(
"send message : "
+ sendMsg);
sQueue.put(send, pmo);
sQueue.close();
System.out.println(
"send message Id"
);
for
(
int
i =
0
; i<send.messageId.length; i++) {
System.out.print(send.messageId[i]);
}
System.out.println();
System.out.println(
"send message Id"
);
// fetch message
openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING
+ MQC.MQOO_INPUT_SHARED;
MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);
MQGetMessageOptions getOptions =
new
MQGetMessageOptions();
getOptions.options = MQC.MQGMO_WAIT;
getOptions.waitInterval = intervalTime;
MQMessage recvMsg =
new
MQMessage();
recvMsg.messageId = send.messageId;
//這裏是關鍵,要保持接收的msgid跟發送的msgid值是同樣的,
//這樣就會根據msgId來取隊列的消息了,而不會取到別的消息
send.clearMessage();
boolean
received =
false
;
int
fetchCount =
0
;
while
(!received) {
try
{
fetchCount++;
rQueue.get(recvMsg, getOptions);
//logger.debug("the " + fetchCount + " time fetch message!");
System.out.println(
"fetch message !!!"
);
received =
true
;
}
catch
(MQException me) {
if
(me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
if
(fetchCount > failedCount) {
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
//logger.error("can't fetch message for " + me.getMessage());
return
null
;
}
}
}
catch
(Exception ex) {
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
//logger.error("can't fetch message for " + ex.getMessage());
return
null
;
}
}
byte
[] bMsg =
new
byte
[recvMsg.getMessageLength()];
recvMsg.readFully(bMsg);
System.out.println(
"rec correlationId Id"
);
for
(
int
i =
0
; i<recvMsg.correlationId.length; i++) {
System.out.print(recvMsg.correlationId[i]);
}
System.out.println();
System.out.println(
"rec correlationId Id"
);
String recv =
new
String(bMsg);
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
return
recv;
}
public
void
setChannel(String channel) {
this
.channel = channel;
}
public
void
setHostname(String hostname) {
this
.hostname = hostname;
}
public
void
setQueueManager(String queueManager) {
this
.queueManager = queueManager;
}
public
void
setPort(
int
port) {
this
.port = port;
}
public
void
setIntervalTime(
int
intervalTime) {
this
.intervalTime = intervalTime;
}
public
void
setFailedCount(
int
failedCount) {
this
.failedCount = failedCount;
}
public
void
setRecvQueue(String recvQueue) {
this
.recvQueue = recvQueue;
}
public
void
setSendQueue(String sendQueue) {
this
.sendQueue = sendQueue;
}
}
相關文章
1.
Rocket MQ 的三種消息發送(同步、異步、單向)和消息訂閱
2.
stream消息接收和消息發送
3.
MQ消息接收接口
4.
Spring整合JMS、IBM MQ發送和接收消息
5.
spring 整合ibm mq 發送和接收消息
6.
【MQ】Eclipse向RocketMQ中發送和接收消息
7.
同步方式的消息發送和異步方式的消息發送
8.
Rocketmq同步發送消息
9.
ActiveMQ發送、接收消息
10.
socket實現消息發送和接收
更多相關文章...
•
HTTP 消息結構
-
HTTP 教程
•
Markdown 代碼
-
Markdown 教程
•
IntelliJ IDEA 代碼格式化配置和快捷鍵
•
IntelliJ IDEA代碼格式化設置
相關標籤/搜索
消息隊列MQ
代碼段
消息
代碼收集
接送
同步
接收
接和
代收
瀏覽器信息
XLink 和 XPointer 教程
MyBatis教程
代碼格式化
亂碼
開發工具
0
分享到微博
分享到微信
分享到QQ
每日一句
每一个你不满意的现在,都有一个你没有努力的曾经。
最新文章
1.
No provider available from registry 127.0.0.1:2181 for service com.ddbuy.ser 解決方法
2.
Qt5.7以上調用虛擬鍵盤(支持中文),以及源碼修改(可拖動,水平縮放)
3.
軟件測試面試- 購物車功能測試用例設計
4.
ElasticSearch(概念篇):你知道的, 爲了搜索…
5.
redux理解
6.
gitee創建第一個項目
7.
支持向量機之硬間隔(一步步推導,通俗易懂)
8.
Mysql 異步複製延遲的原因及解決方案
9.
如何在運行SEPM配置嚮導時將不可認的複雜數據庫密碼改爲簡單密碼
10.
windows系統下tftp服務器使用
本站公眾號
歡迎關注本站公眾號,獲取更多信息
相關文章
1.
Rocket MQ 的三種消息發送(同步、異步、單向)和消息訂閱
2.
stream消息接收和消息發送
3.
MQ消息接收接口
4.
Spring整合JMS、IBM MQ發送和接收消息
5.
spring 整合ibm mq 發送和接收消息
6.
【MQ】Eclipse向RocketMQ中發送和接收消息
7.
同步方式的消息發送和異步方式的消息發送
8.
Rocketmq同步發送消息
9.
ActiveMQ發送、接收消息
10.
socket實現消息發送和接收
>>更多相關文章<<