JavaShuo
欄目
標籤
一段同步接收和發送MQ消息的代碼
時間 2019-11-08
標籤
一段
同步
接收
發送
消息
代碼
简体版
原文
原文鏈接
JAVA代碼:
Java代碼
package
com.sdb.payment.core.mq;
import
org.apache.log4j.Logger;
import
com.ibm.mq.MQC;
import
com.ibm.mq.MQEnvironment;
import
com.ibm.mq.MQException;
import
com.ibm.mq.MQGetMessageOptions;
import
com.ibm.mq.MQMessage;
import
com.ibm.mq.MQPutMessageOptions;
import
com.ibm.mq.MQQueue;
import
com.ibm.mq.MQQueueManager;
public
class
MessageQueueService {
private
static
Logger logger = Logger.getLogger(MessageQueueService.
class
);
private
String hostname =
"192.168.0.117"
;
private
String channel =
"CHL.SVRCONN"
;
private
String queueManager =
"QM_SERVER"
;
private
String sendQueue =
"OMP.QRMT"
;
private
String recvQueue =
"OMP.QLCA"
;
private
int
port =
24100
;
private
int
ccsid =
1381
;
private
int
failedCount =
5
;
private
int
intervalTime =
1000
;
public
MessageQueueService() {
MQEnvironment.hostname = hostname;
MQEnvironment.channel = channel;
MQEnvironment.CCSID = ccsid;
MQEnvironment.port = port;
}
public
String send(String sendMsg)
throws
Exception {
MQQueueManager qManager =
new
MQQueueManager(queueManager);
// send message
int
openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue sQueue = qManager.accessQueue(sendQueue, openOptions);
MQPutMessageOptions pmo =
new
MQPutMessageOptions();
MQMessage send =
new
MQMessage();
send.write(sendMsg.getBytes());
System.out.println(
"send message : "
+ sendMsg);
sQueue.put(send, pmo);
sQueue.close();
System.out.println(
"send message Id"
);
for
(
int
i =
0
; i<send.messageId.length; i++) {
System.out.print(send.messageId[i]);
}
System.out.println();
System.out.println(
"send message Id"
);
// fetch message
openOptions = MQC.MQOO_INQUIRE + MQC.MQOO_FAIL_IF_QUIESCING
+ MQC.MQOO_INPUT_SHARED;
MQQueue rQueue = qManager.accessQueue(recvQueue, openOptions);
MQGetMessageOptions getOptions =
new
MQGetMessageOptions();
getOptions.options = MQC.MQGMO_WAIT;
getOptions.waitInterval = intervalTime;
MQMessage recvMsg =
new
MQMessage();
recvMsg.messageId = send.messageId;
//這裏是關鍵,要保持接收的msgid跟發送的msgid值是同樣的,
//這樣就會根據msgId來取隊列的消息了,而不會取到別的消息
send.clearMessage();
boolean
received =
false
;
int
fetchCount =
0
;
while
(!received) {
try
{
fetchCount++;
rQueue.get(recvMsg, getOptions);
//logger.debug("the " + fetchCount + " time fetch message!");
System.out.println(
"fetch message !!!"
);
received =
true
;
}
catch
(MQException me) {
if
(me.reasonCode == MQException.MQRC_NO_MSG_AVAILABLE) {
if
(fetchCount > failedCount) {
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
//logger.error("can't fetch message for " + me.getMessage());
return
null
;
}
}
}
catch
(Exception ex) {
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
//logger.error("can't fetch message for " + ex.getMessage());
return
null
;
}
}
byte
[] bMsg =
new
byte
[recvMsg.getMessageLength()];
recvMsg.readFully(bMsg);
System.out.println(
"rec correlationId Id"
);
for
(
int
i =
0
; i<recvMsg.correlationId.length; i++) {
System.out.print(recvMsg.correlationId[i]);
}
System.out.println();
System.out.println(
"rec correlationId Id"
);
String recv =
new
String(bMsg);
recvMsg.clearMessage();
rQueue.close();
qManager.disconnect();
return
recv;
}
public
void
setChannel(String channel) {
this
.channel = channel;
}
public
void
setHostname(String hostname) {
this
.hostname = hostname;
}
public
void
setQueueManager(String queueManager) {
this
.queueManager = queueManager;
}
public
void
setPort(
int
port) {
this
.port = port;
}
public
void
setIntervalTime(
int
intervalTime) {
this
.intervalTime = intervalTime;
}
public
void
setFailedCount(
int
failedCount) {
this
.failedCount = failedCount;
}
public
void
setRecvQueue(String recvQueue) {
this
.recvQueue = recvQueue;
}
public
void
setSendQueue(String sendQueue) {
this
.sendQueue = sendQueue;
}
}
相關文章
1.
Rocket MQ 的三種消息發送(同步、異步、單向)和消息訂閱
2.
stream消息接收和消息發送
3.
MQ消息接收接口
4.
Spring整合JMS、IBM MQ發送和接收消息
5.
spring 整合ibm mq 發送和接收消息
6.
【MQ】Eclipse向RocketMQ中發送和接收消息
7.
同步方式的消息發送和異步方式的消息發送
8.
Rocketmq同步發送消息
9.
ActiveMQ發送、接收消息
10.
socket實現消息發送和接收
更多相關文章...
•
HTTP 消息結構
-
HTTP 教程
•
Markdown 代碼
-
Markdown 教程
•
IntelliJ IDEA 代碼格式化配置和快捷鍵
•
IntelliJ IDEA代碼格式化設置
相關標籤/搜索
消息隊列MQ
代碼段
消息
代碼收集
接送
同步
接收
接和
代收
瀏覽器信息
XLink 和 XPointer 教程
MyBatis教程
代碼格式化
亂碼
開發工具
0
分享到微博
分享到微信
分享到QQ
每日一句
每一个你不满意的现在,都有一个你没有努力的曾经。
最新文章
1.
Mud Puddles ( bfs )
2.
ReSIProcate環境搭建
3.
SNAT(IP段)和配置網絡服務、網絡會話
4.
第8章 Linux文件類型及查找命令實踐
5.
AIO介紹(八)
6.
中年轉行互聯網,原動力、計劃、行動(中)
7.
詳解如何讓自己的網站/APP/應用支持IPV6訪問,從域名解析配置到服務器配置詳細步驟完整。
8.
PHP 5 構建系統
9.
不看後悔系列!Rocket MQ 使用排查指南(附網盤鏈接)
10.
如何簡單創建虛擬機(CentoOS 6.10)
本站公眾號
歡迎關注本站公眾號,獲取更多信息
相關文章
1.
Rocket MQ 的三種消息發送(同步、異步、單向)和消息訂閱
2.
stream消息接收和消息發送
3.
MQ消息接收接口
4.
Spring整合JMS、IBM MQ發送和接收消息
5.
spring 整合ibm mq 發送和接收消息
6.
【MQ】Eclipse向RocketMQ中發送和接收消息
7.
同步方式的消息發送和異步方式的消息發送
8.
Rocketmq同步發送消息
9.
ActiveMQ發送、接收消息
10.
socket實現消息發送和接收
>>更多相關文章<<