使用阿里雲消息隊列服務器
控制檯地址:http://ons.console.aliyun.com/#/home/topicapp
Demo:ide
支付消息mq工廠類:ui
public class DfacePayConsumerFactory {
public static String CID = "CID-";
//監聽執行實例
@Autowired
private DfacePayConsumerListener dfacePayConsumerListener;
private String topic;
private String pTag;
private String accessKey;
private String secretKey;
private String tag;
private Consumer consumer;
/**
* @return the topic
*/
public String getTopic() {
return topic;
}
/**
* @param topic the topic to set
*/
public void setTopic(String topic) {
this.topic = topic;
}
/**
* @return the tag
*/
public String getTag() {
return tag;
}
/**
* @param tag the tag to set
*/
public void setTag(String tag) {
this.tag = tag;
}
/**
* @return the pTag
*/
public String getpTag() {
return pTag;
}
/**
* @param pTag the pTag to set
*/
public void setpTag(String pTag) {
this.pTag = pTag;
}
/**
* @return the accessKey
*/
public String getAccessKey() {
return accessKey;
}
/**
* @param accessKey the accessKey to set
*/
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
/**
* @return the secretKey
*/
public String getSecretKey() {
return secretKey;
}
/**
* @param secretKey the secretKey to set
*/
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public void initConsumer() {
Properties properties = new Properties();
String consumerId = CID + this.topic
+ (StringUtils.hasText(this.pTag) ? "-" + this.pTag : "");
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, this.accessKey);
properties.put(PropertyKeyConst.SecretKey, this.secretKey);
/**
(
//相關屬性介紹:
//Properties properties = new Properties();
//properties.put(PropertyKeyConst.ConsumerId, consumerLocal.getConsumerId());
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
//properties.put(PropertyKeyConst.AccessKey, consumerLocal.getAccessKey());
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
//properties.put(PropertyKeyConst.SecretKey, consumerLocal.getSecreKey());
//消息處理失敗後多久從新發送消息
properties.put(PropertyKeyConst.SuspendTimeMillis, consumerLocal.getSuspendTimeMillis());
//重發的次數
//properties.put(PropertyKeyConst.MaxReconsumeTimes, consumerLocal.getMaxReconsumeTimes());
//消費者的線程數
//properties.put(PropertyKeyConst.ConsumeThreadNums,"1");
//消費者的介入地址
//properties.put(PropertyKeyConst.ONSAddr, consumerLocal.getOnsAddress());
)
**/
consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, tag, this.dfacePayConsumerListener);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(90000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.start();
}
}).start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutDown();
}
});
}
/**
* 中止監聽
*
* @return
*/
public boolean shutDown() {
if (null != this.consumer) {
this.consumer.shutdown();
return true;
}
return false;
}
}
//dPay支付監聽 執行mq consume消息接收(經過topic訂閱)@Component("dfacePayConsumerListener")public class DfacePayConsumerListener implements MessageListener { private static Logger logger = LoggerFactory.getLogger(DfacePayConsumerListener.class); @Autowired private ...; @Override public Action consume(Message message, ConsumeContext context) { String msg = new String(message.getBody()); String tag = message.getTag(); logger.info(LogUtils.builder().append("mq", "接受mq").append("mqTag", tag) .append("mqMsg", msg).toString()); return tagHandle(tag, msg, message); } public Action tagHandle(String tag, String msg, Message message) { if (MqTagEnum.PAY.name().equals(tag)) { try { PayBackBo payBackBo = JSON.parse(msg, PayBackBo.class); //檢查訂單號 if (!payBackBo.getOrderNo().startsWith(ApplicationConstant.APP_NO)) { logger.info(LogUtils.format("訂單支付失敗: orderNo 前綴 ", payBackBo.getOrderNo())); return Action.CommitMessage; } return giftOrderHandle(payBackBo); } catch (Exception e) { e.printStackTrace(); logger.info(LogUtils.format("paid_error", e.getMessage())); return Action.ReconsumeLater; } } else { logger.info(LogUtils.builder().append("tag error", tag).append("msg", msg) .append("message", message).toString()); } return Action.CommitMessage; } /** * 支付訂單處理 * * @param payBackBo * @return */ private Action giftOrderHandle(PayBackBo payBackBo) { //處理支付業務邏輯 }}