MQ 使用狀況梳理

我的梳理有限:歡迎你們 豐富此文檔

目前規劃原則:css

         topic 建立基於業務  消費者基於模塊 多對多關係 且消費本身的topic 不會影響別人  topic n↔n  CIDhtml

基於業務topic 的分佈表格: (後續有模塊更新請自行更新文檔或者聯繫我補上)

topic tag 使用模塊/消費者CID 說明 消費方式 負責人
主題:行程
topic: topic_dev_xxx_trip

trip.*

orderdisp/ CID_DEV_xxx_TRIP

行程變動使用

集羣消費 (無序)

蔣飛

主題:座標
topic: topic_dev_xxx_coord

coord_notice

coord/CID_DEV_xxx_COORD

結算信息和語音播報

廣播消費 (無序)

付穩穩

主題: 首次登錄
topic: topic_dev_xxx_broadcast

uc_passenger_first_login

coupon/CID_DEV_xxx_BROADCAST(廢棄)

已廢棄 首登送券

廣播消費(無序)

孫金新

uc_passenger_first_login

coupon/CID_DEV_xxx_COUPON (已建)

代替上面 首登送券

集羣消費 (無序)

孫金新


主題: 支付完成
topic: topic_dev_xxx_pay_broadcast

pay_notice

coupon / CID_DEV_xxx_COUPON (已建)

支付完成 消費優惠券

集羣消費 (無序)

孫金新

pay_notice

orderplace /CID_DEV_xxx_PLACE (已建)

支付完成 更新訂單

集羣消費 (無序)

張開文

pay_notice

ordertaking /CID_DEV_xxx_TAKING(已建)

支付完成 更新行程/訂單狀態→ 座標

集羣消費 (無序)

慄東星

pay_notice

bill /CID_DEV_xxx_BILL

支付完成更新帳單狀態

集羣消費 (無序)

劉明濤

pay_notice

datawarehouse/CID_DEV_xxx_DATAWARE

支付完成 數據統計

集羣消費 (無序)

王德成

refund_notice

datawarehouse/CID_DEV_xxx_DATAWARE

退款完成 數據統計

集羣消費 (無序)

王德成

主題: 順序更新行程狀態
topic: topic_dev_xxx_order

update_trip

orderplace/CID_DEV_xxx_ORDER

更新行程信息

集羣消費 (有序)

張開文


MQ: 使用原則和規範:java

      正確的順序: 是先啓動Consumer 後再啓動producer。git

  1.   全部業務目前使用同一個生產者 PID
  2.   全部topic 由主帳號建立 並受權給子帳號(dev/prod)
  3.   topic 的建立基於業務(首次登錄,支付成功,行程結束等等)  
  4.   CID(消費者 ID)的建立基於應用 (每一個應用若是須要建立一個CID 若須要(廣播和集羣)兩種消費模式則建立兩個CID  廣播方式後綴加_BROADCAST區分)
  5.   Consumer ID 和 Topic 的關係是 N:N。 同一個 Consumer ID 能夠訂閱多個 Topic,同一個 Topic 也能夠對應多個 Consumer ID。
  6.   消息訂閱一致性( 同一 CustomerID 的全部使用的模塊 訂閱的 topic tag 數量須要徹底一致 )
  7.   CID只消費本身受權訂閱的 topic. 

MQ 使用狀況總結: github

  1. 主帳號建立的CustomerID  以主帳號(或有最高權限的受權用戶)的Access/Secret 的身份的登陸  啓動實例   消費者在線 且能夠接收消息 而且能夠突破訂閱限制 訂閱誰能夠消費誰  (前提是訂閱關係一致性 同一 CustomerID 的全部使用的模塊 訂閱的 topic tag 數量須要徹底一致 )

  2. 主帳號建立的CustomerID 以子帳號dev(普通權限用戶)Access/Secret 的身份的登陸  啓動實例  會出現 topic 消費者 不在線狀態 (同當日線上狀態)  6月以前創建的topic 和 CID 因爲阿里雲有補償機制 仍舊能夠運行. (這也是致使上線失敗的緣由: 當時線下用的 dev 具備最高權限 ,線上 prod 是普通用戶權限)

  3. 子帳號dev 登陸阿里雲,建立不一樣CustomerID 後 ,以子帳號(普通權限用戶) devAccess/Secret 的身份的登陸  啓動實例    消費者在線  且能夠接收消息 而且能夠突破訂閱限制 訂閱誰能夠消費誰 但僅限於消費(子帳號)  被受權的 topic.  
    未受權CID 爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)消費



可行方案一web

    每一個模塊實例都使用同一個子帳號(Access/Secret相同)  不一樣模塊使用使用同一個 CID時, 須要作到 消息訂閱一致性( 同一 CustomerID 的全部使用的模塊 訂閱的 topic tag 數量須要徹底一致 )spring


可行方案二: (我的推薦方案)json

    每一個模塊實例都使用同一個子帳號(Access/Secret相同)  每一個模塊單獨分配本身的CID(同一子帳號dev 統一建立), 模塊之間數據隔離, 要求各個模塊自能用本身的 CID 且不要訂閱本身模塊不應訂閱的 topic 和 tag    (缺點:同一個子帳號dev [Access/Secret相同] 訂閱誰能夠消費誰(可是不響應其餘模塊) 只要子帳號被受權的 topic  每一個CID 均可以訂閱該topic)api

     1.主帳號登陸並建立topic服務器

     2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)

     3.子帳號登陸 topic管理中建立本身帳號下的CID

     4.程序中使用 同一子帳號(Access/Secret) 可是本身模塊的 CID 消費消息 相互不影響


最強隔離方案:

  每一個模塊實例都使用不一樣子帳號(Access/Secret不一樣)  每一個模塊單獨分配本身子帳號建立的CID.這樣模塊之間能夠保障不能相互訂閱和消費.

     1.主帳號登陸並建立topic

     2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)

     3.子帳號登陸 topic管理中建立本身帳號下的CID

     4.程序中使用 不一樣子帳號(Access/Secret不一樣)下本身模塊的 CID 消費消息 相互不影響



topic n↔n  CID 圖解:


https://pic2.zhimg.com/80/v2-b6ed65f370a766620718ad4227d5d4e5_hd.jpg

 奉上  官方文檔:  https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.4.5.565f7b25vcsskW

          官方DEMO:  https://github.com/AliwareMQ/mq-demo?spm=a2c4g.11186623.2.14.578018aaaNZL17

          RocketMQ源碼分析輔助: https://www.processon.com/view/5a6eb653e4b05680c3e94fec


我的梳理有限:歡迎你們 豐富此文檔




測試用例:

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.xxx.engine.ui.controller.pay.base.BaseTest;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.web.WebAppConfiguration;

import java.util.Properties;


/**
*
* topic:
*
* topic_dev_xxx_trip_tomas
* topic_dev_xxx_pay_tomas
*
* CID:
*
* CID_DEV_xxx_TRIP_TOMAS 做爲 topic: trip_tomas 和pay_tomas 消費者
* CID_DEV_xxx_COORD_TOMAS 做爲 topic: trip_tomas 和pay_tomas 消費者
* CID_DEV_xxx_TAKING_TOMAS 做爲 topic: trip_tomas 消費者
* CID_DEV_xxx_ORDER_TOMAS 做爲 topic: pay_tomas 消費者
* CID_DEV_xxx_COUPON_TOMAS 只創建不須要受權 topic
*/

@Slf4j
@SpringBootTest(classes={PayMQTest.class})
@ComponentScan(basePackages = {"com.xxx.engine"})
@TestPropertySource("classpath:engine_common.properties")
@WebAppConfiguration
public class PayMQTest extends BaseTest {


@Value("${xxx.mq.tag.pay.notice:pay_notice}")
private String MQ_TAG_PAY;

private void sendTestMQ(String topic, String tag) {
Properties properties = new Properties();
// 您在MQ控制檯建立的Producer ID
properties.put(PropertyKeyConst.ProducerId, "PID_xxx_DEV");
// 鑑權用AccessKey,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey,"LTAIBvzNg84oa7bM");
// 鑑權用SecretKey,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, "5VynbpH5JmRXTshkSbTlLdsbMErHF7");
// 設置 TCP 接入域名(此處以公共雲的公網接入爲例)
properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用start方法來啓動Producer,只需調用一次便可
producer.start();
//循環發送消息
for (int i=10000;i<10005;i++){
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = topic.concat(tag).concat("消息內容:").concat(String.valueOf(i));
Message msg = new Message( //
// 在控制檯建立的Topic,即該消息所屬的Topic名稱
topic,
// Message Tag,
// 可理解爲Gmail中的標籤,對消息進行再歸類,方便Consumer指定過濾條件在MQ服務器過濾
tag,
// Message Body
// 任何二進制形式的數據, MQ不作任何干預,
// 須要Producer與Consumer協商好一致的序列化和反序列化方式
message.getBytes());
// 設置表明消息的業務關鍵屬性,請儘量全局惟一,以方便您在沒法正常收到消息狀況下,可經過MQ控制檯查詢消息並補發
// 發送消息,只要不拋異常就是成功
// 打印Message ID,以便用於消息發送狀態查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在應用退出前,能夠銷燬Producer對象
producer.shutdown();
}

/**
* 持續發送1w 條 MQ 消息
* @throws Exception
*/
@Test
public void sendMQ() throws Exception {
sendTestMQ("topic_dev_xxx_trip_tomas", MQ_TAG_PAY);
sendTestMQ("topic_dev_xxx_pay_tomas", MQ_TAG_PAY);
}
/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* @throws Exception
*/
@Test
public void WithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* PS. CID_DEV_xxx_TRIP_TOMAS 同一個 CID 能夠啓動多個實例 可是必須保證 每一個實例訂閱的 topic 和 tag 一致 否則會違反消息一致性原則 致使消息消費混亂
* @throws Exception
*/
@Test
public void WithAuthCIDSecond() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
//正確方式(和上個實例同樣) 消費成功
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

//錯誤方式(和上個實例不同) 消費失敗 違反訂閱關係一致性
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

consumer.start();
System.out.println("WithAuthCIDSecond Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帳號dev 子帳號(普通權限用戶) dev Access/Secret 的身份的登陸
* CID 由子帳號建立 並訂閱topic (本例:topic_dev_xxx_trip_tomas)
* 啓動實例 消費者在線 且能夠接收消息
* PS. CID_DEV_xxx_TAKING_TOMAS 做爲topic_dev_xxx_trip_tomas的消費者 不影響其餘模塊CID 訂閱和消費任何topic
* @throws Exception
*/
@Test
public void WithAuthCID3() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TAKING_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID3 Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帳號dev 以子帳號(普通權限用戶) devAccess/Secret 的身份的登陸
* 啓動實例 消費者在線 且能夠接收消息 而且能夠突破訂閱限制 訂閱誰能夠消費誰 但僅限於消費(子帳號) 被受權的 topic
*
* 子帳號CID不是 topic指定的消費者 強制做爲爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_COUPON_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帳號CID不是 topic指定的消費者 強制做爲爲topic的消費者時 由於子帳號有訂閱消費權限 因此 子帳號建立的 CID 能夠訂閱和消費 topic 可是不影響其餘模塊(其餘 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID2() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_ORDER_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID2 Started.");
Thread.sleep(1000000000000l);
}


/**
* 強隔離方案:
* 每一個模塊實例都使用不一樣子帳號(Access/Secret不一樣) 每一個模塊單獨分配本身子帳號建立的CID.這樣模塊之間能夠保障不能相互訂閱和消費.
* 1.主帳號登陸並建立topic
* 2.受權訂閱權限給子帳號(帳號不能訪問未受權的 topic)
* 3.子帳號登陸 topic管理中建立本身帳號下的CID
* 4.程序中使用 不一樣子帳號(Access/Secret不一樣)下本身模塊的 CID 消費消息 相互不影響
*
* @throws Exception
*/
@Test
public void recivePayMQByCoodAccountWithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TEST");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("recivePayMQByCoodAccountWithAuthCID Started.");
//Thread.sleep(1000000000000l);
}

private MessageListener getMessageListener(){
return new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
log.info("receive message ={} ConsumeContext={}:", JSON.toJSONString(message), JSON.toJSONString(consumeContext));
} catch (Exception e) {
log.error("{}", e);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}; }
相關文章
相關標籤/搜索