目前規劃原則:css
topic 建立基於業務 消費者基於模塊 多對多關係 且消費本身的topic 不會影響別人 topic n↔n CIDhtml
topic | tag | 使用模塊/消費者CID | 說明 | 消費方式 | 負責人 |
---|---|---|---|---|---|
主題:行程 topic: topic_dev_xxx_trip |
trip.* |
orderdisp/ CID_DEV_xxx_TRIP | 行程變動使用 |
集羣消費 (無序) |
蔣飛 |
主題:座標 topic: topic_dev_xxx_coord |
coord_notice |
coord/CID_DEV_xxx_COORD |
結算信息和語音播報 |
廣播消費 (無序) |
付穩穩 |
主題: 首次登錄 topic: topic_dev_xxx_broadcast |
uc_passenger_first_login |
|
已廢棄 首登送券 |
廣播消費(無序) |
孫金新 |
uc_passenger_first_login |
coupon/CID_DEV_xxx_COUPON (已建) |
代替上面 首登送券 |
集羣消費 (無序) |
孫金新 |
|
主題: 支付完成 topic: topic_dev_xxx_pay_broadcast
|
pay_notice |
coupon / CID_DEV_xxx_COUPON (已建) |
支付完成 消費優惠券 |
集羣消費 (無序) |
孫金新 |
pay_notice |
orderplace /CID_DEV_xxx_PLACE (已建) |
支付完成 更新訂單 |
集羣消費 (無序) |
張開文 |
|
pay_notice |
ordertaking /CID_DEV_xxx_TAKING(已建) |
支付完成 更新行程/訂單狀態→ 座標 |
集羣消費 (無序) |
慄東星 |
|
pay_notice |
bill /CID_DEV_xxx_BILL |
支付完成更新帳單狀態 |
集羣消費 (無序) |
劉明濤 |
|
pay_notice |
datawarehouse/CID_DEV_xxx_DATAWARE |
支付完成 數據統計 |
集羣消費 (無序) |
王德成 |
|
refund_notice |
datawarehouse/CID_DEV_xxx_DATAWARE |
退款完成 數據統計 |
集羣消費 (無序) |
王德成 |
|
主題: 順序更新行程狀態 topic: topic_dev_xxx_order |
update_trip |
orderplace/CID_DEV_xxx_ORDER |
更新行程信息 |
集羣消費 (有序) |
張開文 |
MQ: 使用原則和規範:java
正確的順序: 是先啓動Consumer 後再啓動producer。git
MQ 使用狀況總結: github
未受權CID 爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)消費
可行方案一: web
每一個模塊實例都使用同一個子帳號(Access/Secret相同) 不一樣模塊使用使用同一個 CID時, 須要作到 消息訂閱一致性( 同一 CustomerID 的全部使用的模塊 訂閱的 topic和 tag 數量須要徹底一致 )spring
可行方案二: (我的推薦方案)json
每一個模塊實例都使用同一個子帳號(Access/Secret相同) 每一個模塊單獨分配本身的CID(同一子帳號dev 統一建立), 模塊之間數據隔離, 要求各個模塊自能用本身的 CID 且不要訂閱本身模塊不應訂閱的 topic 和 tag (缺點:同一個子帳號dev [Access/Secret相同] 訂閱誰能夠消費誰(可是不響應其餘模塊) 只要子帳號被受權的 topic 每一個CID 均可以訂閱該topic)api
1.主帳號登陸並建立topic服務器
2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)
3.子帳號登陸 topic管理中建立本身帳號下的CID
4.程序中使用 同一子帳號(Access/Secret) 可是本身模塊的 CID 消費消息 相互不影響
最強隔離方案:
每一個模塊實例都使用不一樣子帳號(Access/Secret不一樣) 每一個模塊單獨分配本身子帳號建立的CID.這樣模塊之間能夠保障不能相互訂閱和消費.
1.主帳號登陸並建立topic
2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)
3.子帳號登陸 topic管理中建立本身帳號下的CID
4.程序中使用 不一樣子帳號(Access/Secret不一樣)下本身模塊的 CID 消費消息 相互不影響
topic n↔n CID 圖解:
https://pic2.zhimg.com/80/v2-b6ed65f370a766620718ad4227d5d4e5_hd.jpg
下
奉上 官方文檔: https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.4.5.565f7b25vcsskW
官方DEMO: https://github.com/AliwareMQ/mq-demo?spm=a2c4g.11186623.2.14.578018aaaNZL17
RocketMQ源碼分析輔助: https://www.processon.com/view/5a6eb653e4b05680c3e94fec
我的梳理有限:歡迎你們 豐富此文檔
測試用例:
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.xxx.engine.ui.controller.pay.base.BaseTest;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.web.WebAppConfiguration;
import java.util.Properties;
/**
*
* topic:
*
* topic_dev_xxx_trip_tomas
* topic_dev_xxx_pay_tomas
*
* CID:
*
* CID_DEV_xxx_TRIP_TOMAS 做爲 topic: trip_tomas 和pay_tomas 消費者
* CID_DEV_xxx_COORD_TOMAS 做爲 topic: trip_tomas 和pay_tomas 消費者
* CID_DEV_xxx_TAKING_TOMAS 做爲 topic: trip_tomas 消費者
* CID_DEV_xxx_ORDER_TOMAS 做爲 topic: pay_tomas 消費者
* CID_DEV_xxx_COUPON_TOMAS 只創建不須要受權 topic
*/
@Slf4j
@SpringBootTest(classes={PayMQTest.class})
@ComponentScan(basePackages = {"com.xxx.engine"})
@TestPropertySource("classpath:engine_common.properties")
@WebAppConfiguration
public class PayMQTest extends BaseTest {
@Value("${xxx.mq.tag.pay.notice:pay_notice}")
private String MQ_TAG_PAY;
private void sendTestMQ(String topic, String tag) {
Properties properties = new Properties();
// 您在MQ控制檯建立的Producer ID
properties.put(PropertyKeyConst.ProducerId, "PID_xxx_DEV");
// 鑑權用AccessKey,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey,"LTAIBvzNg84oa7bM");
// 鑑權用SecretKey,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, "5VynbpH5JmRXTshkSbTlLdsbMErHF7");
// 設置 TCP 接入域名(此處以公共雲的公網接入爲例)
properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啓動Producer,只需調用一次便可
producer.start();
//循環發送消息
for (int i=10000;i<10005;i++){
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = topic.concat(tag).concat("消息內容:").concat(String.valueOf(i));
Message msg = new Message( //
// 在控制檯建立的Topic,即該消息所屬的Topic名稱
topic,
// Message Tag,
// 可理解爲Gmail中的標籤,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
tag,
// Message Body
// 任何二進制形式的數據, MQ不作任何干預,
// 須要Producer與Consumer協商好一致的序列化和反序列化方式
message.getBytes());
// 設置表明消息的業務關鍵屬性,請儘量全局惟一,以方便您在沒法正常收到消息狀況下,可經過MQ控制檯查詢消息並補發
// 發送消息,只要不拋異常就是成功
// 打印Message ID,以便用於消息發送狀態查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在應用退出前,能夠銷燬Producer對象
producer.shutdown();
}
/**
* 持續發送1w 條 MQ 消息
* @throws Exception
*/
@Test
public void sendMQ() throws Exception {
sendTestMQ("topic_dev_xxx_trip_tomas", MQ_TAG_PAY);
sendTestMQ("topic_dev_xxx_pay_tomas", MQ_TAG_PAY);
}
/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* @throws Exception
*/
@Test
public void WithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* PS. CID_DEV_xxx_TRIP_TOMAS 同一個 CID 能夠啓動多個實例 可是必須保證 每一個實例訂閱的 topic 和 tag 一致 否則會違反消息一致性原則 致使消息消費混亂
* @throws Exception
*/
@Test
public void WithAuthCIDSecond() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
//正確方式(和上個實例同樣) 消費成功
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
//錯誤方式(和上個實例不同) 消費失敗 違反訂閱關係一致性
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("WithAuthCIDSecond Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* PS. CID_DEV_xxx_TAKING_TOMAS 做爲topic_dev_xxx_trip_tomas的消費者 不影響其餘模塊CID 訂閱和消費任何topic
* @throws Exception
*/
@Test
public void WithAuthCID3() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TAKING_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID3 Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帳號dev 以子帳號(普通權限用戶) devAccess/Secret 的身份的登陸
* 啓動實例 消費者在線 且能夠接收消息 而且能夠突破訂閱限制 訂閱誰能夠消費誰 但僅限於消費(子帳號) 被受權的 topic
*
* 子帳號CID不是 topic指定的消費者 強制做爲爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_COUPON_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帳號CID不是 topic指定的消費者 強制做爲爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID2() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_ORDER_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID2 Started.");
Thread.sleep(1000000000000l);
}
/**
* 強隔離方案:
* 每一個模塊實例都使用不一樣子帳號(Access/Secret不一樣) 每一個模塊單獨分配本身子帳號建立的CID.這樣模塊之間能夠保障不能相互訂閱和消費.
* 1.主帳號登陸並建立topic
* 2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)
* 3.子帳號登陸 topic管理中建立本身帳號下的CID
* 4.程序中使用 不一樣子帳號(Access/Secret不一樣)下本身模塊的 CID 消費消息 相互不影響
*
* @throws Exception
*/
@Test
public void recivePayMQByCoodAccountWithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TEST");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("recivePayMQByCoodAccountWithAuthCID Started.");
//Thread.sleep(1000000000000l);
}
private MessageListener getMessageListener(){
return new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
log.info("receive message ={} ConsumeContext={}:", JSON.toJSONString(message), JSON.toJSONString(consumeContext));
} catch (Exception e) {
log.error("{}", e);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}; }