匠心零度 轉載請註明原創出處,謝謝!java
rocketmq的幾個核心的模塊,而對於每一個模塊都是單獨的jvm進程,咱們看到上面的架構圖的時候,那些箭頭就是rocketmq的rpc調用,下面咱們來看看rocketmq的rpc是若是進行封裝實現的。服務器
說明: rocketmq系列都將會以rocketmq-4.1.0-incubating進行介紹。網絡
先排除Master、Slave直接經過原生的nio進行調用,其餘通信都是基於netty-all-4.0.36.Final以及RocketMQ自定義協議進行通信的。架構
咱們來看看header data裏面的數據定義:異步
code對於Request來講就是RequestCode類裏面的常量信息:jvm
說明:公衆號【匠心零度】回覆:rocketmq,可得到基於rocketmq4.1.0加詳細中文代碼註釋 。tcp
public class RequestCode { // Broker 發送消息 public static final int SEND_MESSAGE = 10; // Broker 訂閱消息 public static final int PULL_MESSAGE = 11; // Broker 查詢消息 public static final int QUERY_MESSAGE = 12; // Broker 查詢Broker Offset public static final int QUERY_BROKER_OFFSET = 13; // Broker 查詢Consumer Offset public static final int QUERY_CONSUMER_OFFSET = 14; // Broker 更新Consumer Offset public static final int UPDATE_CONSUMER_OFFSET = 15; // Broker 更新或者增長一個Topic public static final int UPDATE_AND_CREATE_TOPIC = 17; // Broker 獲取全部Topic的配置(Slave和Namesrv都會向Master請求此配置) public static final int GET_ALL_TOPIC_CONFIG = 21; // Broker 獲取全部Topic配置(Slave和Namesrv都會向Master請求此配置 public static final int GET_TOPIC_CONFIG_LIST = 22; // Broker 獲取全部Topic名稱列表 public static final int GET_TOPIC_NAME_LIST = 23; // Broker 更新Broker上的配置 public static final int UPDATE_BROKER_CONFIG = 25; // Broker 獲取Broker上的配置 public static final int GET_BROKER_CONFIG = 26; // Broker 觸發Broker刪除文件 public static final int TRIGGER_DELETE_FILES = 27; // Broker 獲取Broker運行時信息 public static final int GET_BROKER_RUNTIME_INFO = 28; // Broker 根據時間查詢隊列的Offset public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29; // Broker 查詢隊列最大Offset public static final int GET_MAX_OFFSET = 30; // Broker 查詢隊列最小Offset public static final int GET_MIN_OFFSET = 31; // Broker 查詢隊列最先消息對應時間 public static final int GET_EARLIEST_MSG_STORETIME = 32; // Broker 根據消息ID來查詢消息 public static final int VIEW_MESSAGE_BY_ID = 33; // Broker Client向Client發送心跳,並註冊自身 public static final int HEART_BEAT = 34; // Broker Client註銷 public static final int UNREGISTER_CLIENT = 35; // Broker Consumer將處理不了的消息發回服務器 public static final int CONSUMER_SEND_MSG_BACK = 36; // Broker Commit或者Rollback事務 public static final int END_TRANSACTION = 37; // Broker 獲取ConsumerId列表經過GroupName public static final int GET_CONSUMER_LIST_BY_GROUP = 38; // Broker 主動向Producer回查事務狀態 public static final int CHECK_TRANSACTION_STATE = 39; // Broker Broker通知Consumer列表變化 public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40; // Broker Consumer向Master鎖定隊列 public static final int LOCK_BATCH_MQ = 41; // Broker Consumer向Master解鎖隊列 public static final int UNLOCK_BATCH_MQ = 42; // Broker 獲取全部Consumer Offset public static final int GET_ALL_CONSUMER_OFFSET = 43; // Broker 獲取全部定時進度 public static final int GET_ALL_DELAY_OFFSET = 45; public static final int CHECK_CLIENT_CONFIG = 46; // Namesrv 向Namesrv追加KV配置 public static final int PUT_KV_CONFIG = 100; // Namesrv 從Namesrv獲取KV配置 public static final int GET_KV_CONFIG = 101; // Namesrv 從Namesrv獲取KV配置 public static final int DELETE_KV_CONFIG = 102; // Namesrv 註冊一個Broker,數據都是持久化的,若是存在則覆蓋配置 public static final int REGISTER_BROKER = 103; // Namesrv 卸載一個Broker,數據都是持久化的 public static final int UNREGISTER_BROKER = 104; // Namesrv 根據Topic獲取Broker Name、隊列數(包含讀隊列與寫隊列) public static final int GET_ROUTEINTO_BY_TOPIC = 105; // Namesrv 獲取註冊到Name Server的全部Broker集羣信息 public static final int GET_BROKER_CLUSTER_INFO = 106; public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200; public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201; public static final int GET_TOPIC_STATS_INFO = 202; public static final int GET_CONSUMER_CONNECTION_LIST = 203; public static final int GET_PRODUCER_CONNECTION_LIST = 204; public static final int WIPE_WRITE_PERM_OF_BROKER = 205; // 從Name Server獲取完整Topic列表 public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206; // 從Broker刪除訂閱組 public static final int DELETE_SUBSCRIPTIONGROUP = 207; // 從Broker獲取消費狀態(進度) public static final int GET_CONSUME_STATS = 208; // Suspend Consumer消費過程 public static final int SUSPEND_CONSUMER = 209; // Resume Consumer消費過程 public static final int RESUME_CONSUMER = 210; // 重置Consumer Offset public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211; // 重置Consumer Offset public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212; // 調整Consumer線程池數量 public static final int ADJUST_CONSUMER_THREAD_POOL = 213; // 查詢消息被哪些消費組消費 public static final int WHO_CONSUME_THE_MESSAGE = 214; // 從Broker刪除Topic配置 public static final int DELETE_TOPIC_IN_BROKER = 215; // 從Namesrv刪除Topic配置 public static final int DELETE_TOPIC_IN_NAMESRV = 216; // 經過NameSpace獲取全部的KV List public static final int GET_KVLIST_BY_NAMESPACE = 219; // offset 重置 public static final int RESET_CONSUMER_CLIENT_OFFSET = 220; // 客戶端訂閱消息 public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221; // 通知 broker 調用 offset 重置處理 public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222; // 通知 broker 調用客戶端訂閱消息處理 public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223; // Broker 查詢topic被誰消費 public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300; // 獲取指定集羣下的全部 topic public static final int GET_TOPICS_BY_CLUSTER = 224; // 向Broker註冊Filter Server public static final int REGISTER_FILTER_SERVER = 301; // 向Filter Server註冊Class public static final int REGISTER_MESSAGE_FILTER_CLASS = 302; // 根據 topic 和 group 獲取消息的時間跨度 public static final int QUERY_CONSUME_TIME_SPAN = 303; // 獲取全部系統內置 Topic 列表 public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304; public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305; // 清理失效隊列 public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306; // 經過Broker查詢Consumer內存數據 public static final int GET_CONSUMER_RUNNING_INFO = 307; // 查找被修正 offset (轉發組件) public static final int QUERY_CORRECTION_OFFSET = 308; // 經過Broker直接向某個Consumer發送一條消息,並馬上消費,返回結果給broker,再返回給調用方 public static final int CONSUME_MESSAGE_DIRECTLY = 309; // Broker 發送消息,優化網絡數據包 public static final int SEND_MESSAGE_V2 = 310; // 單元化相關 topic public static final int GET_UNIT_TOPIC_LIST = 311; // 獲取含有單元化訂閱組的 Topic 列表 public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312; // 獲取含有單元化訂閱組的非單元化 Topic 列表 public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313; // 克隆某一個組的消費進度到新的組 public static final int CLONE_GROUP_OFFSET = 314; // 查看Broker上的各類統計信息 public static final int VIEW_BROKER_STATS_DATA = 315; public static final int CLEAN_UNUSED_TOPIC = 316; public static final int GET_BROKER_CONSUME_STATS = 317; /** * update the config of name server */ public static final int UPDATE_NAMESRV_CONFIG = 318; /** * get config from name server */ public static final int GET_NAMESRV_CONFIG = 319; public static final int SEND_BATCH_MESSAGE = 320; public static final int QUERY_CONSUME_QUEUE = 321; }
code對於Response來講就是ResponseCode類裏面的常量信息:源碼分析
public class ResponseCode extends RemotingSysResponseCode { public static final int FLUSH_DISK_TIMEOUT = 10; public static final int SLAVE_NOT_AVAILABLE = 11; public static final int FLUSH_SLAVE_TIMEOUT = 12; public static final int MESSAGE_ILLEGAL = 13; public static final int SERVICE_NOT_AVAILABLE = 14; public static final int VERSION_NOT_SUPPORTED = 15; public static final int NO_PERMISSION = 16; public static final int TOPIC_NOT_EXIST = 17; public static final int TOPIC_EXIST_ALREADY = 18; public static final int PULL_NOT_FOUND = 19; public static final int PULL_RETRY_IMMEDIATELY = 20; public static final int PULL_OFFSET_MOVED = 21; public static final int QUERY_NOT_FOUND = 22; public static final int SUBSCRIPTION_PARSE_FAILED = 23; public static final int SUBSCRIPTION_NOT_EXIST = 24; public static final int SUBSCRIPTION_NOT_LATEST = 25; public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26; public static final int FILTER_DATA_NOT_EXIST = 27; public static final int FILTER_DATA_NOT_LATEST = 28; public static final int TRANSACTION_SHOULD_COMMIT = 200; public static final int TRANSACTION_SHOULD_ROLLBACK = 201; public static final int TRANSACTION_STATE_UNKNOW = 202; public static final int TRANSACTION_STATE_GROUP_WRONG = 203; public static final int NO_BUYER_ID = 204; public static final int NOT_IN_CURRENT_UNIT = 205; public static final int CONSUMER_NOT_ONLINE = 206; public static final int CONSUME_MSG_TIMEOUT = 207; public static final int NO_MESSAGE = 208; }
flag字段進行說明,其餘後續分析到具體的具體分析。學習
flag = 0表示是request,flag = 1表示是response。優化
private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND public RemotingCommandType getType() { if (this.isResponseType()) {//flag=1爲true return RemotingCommandType.RESPONSE_COMMAND; } return RemotingCommandType.REQUEST_COMMAND; } public boolean isResponseType() { int bits = 1 << RPC_TYPE; return (this.flag & bits) == bits; }
flag爲二、3(二進制表示十、11)爲oneway請求。
private static final int RPC_ONEWAY = 1; // Oneway bit public void markOnewayRPC() { int bits = 1 << RPC_ONEWAY; this.flag |= bits; } public boolean isOnewayRPC() { int bits = 1 << RPC_ONEWAY; return (this.flag & bits) == bits; }
code=310很快咱們就明白什麼意思了:
對於下面相似a、b、c等能夠簡單查看下類SendMessageRequestHeaderV2(後續繼續講解)基本就是相似js壓縮效果,能夠借鑑學習下。
public static SendMessageRequestHeaderV2 createSendMessageRequestHeaderV2(final SendMessageRequestHeader v1) { SendMessageRequestHeaderV2 v2 = new SendMessageRequestHeaderV2(); /** * 進行轉換,這樣網絡傳輸數據就比較小了,學習下 */ v2.a = v1.getProducerGroup(); v2.b = v1.getTopic(); v2.c = v1.getDefaultTopic(); v2.d = v1.getDefaultTopicQueueNums(); v2.e = v1.getQueueId(); v2.f = v1.getSysFlag(); v2.g = v1.getBornTimestamp(); v2.h = v1.getFlag(); v2.i = v1.getProperties(); v2.j = v1.getReconsumeTimes(); v2.k = v1.isUnitMode(); v2.l = v1.getMaxReconsumeTimes(); v2.m = v1.isBatch(); return v2; }
備註: RemotingCommand類包含了傳輸過程當中全部數據的封裝,還包括了編解碼等操做(很是棒!!!解讀爲何這樣,從面向對象角度,誰擁有數據誰就對外提供操做這些數據的方法,這句話應該是大學的時候學習面向對象的時候看張孝祥老師說的,一直記憶猶新,的確應該這麼設計,rocketmq就這麼作的,再次學習)。
上面的圖已經作到很是清晰了,RemotingClient接口定義了client應該具有那些功能,RemotingSever相似,主要有:registerProcessor、invokeSync(同步調用)、invokeAsync(異步調用)、invokeOneway(單向調用)等等,而RemotingClient與RemotingSever在三種調用的區別就是參數有所區別。
NettyRemotingAbstract是Server與Client公用處理的抽象。
BrokerOuterAPI、MQClientImpl:都封裝了NettyRemotingClient(後續介紹)。
不論是client仍是server經過RemotingService咱們明白,啓動都是在start裏面,咱們看看裏面核心netty代碼,以server裏面代碼爲例:
備註:此處netty相關內容不進行深刻展開,只會把涉及的的簡單說明,後續另開系列進行說明。
在進行tcp傳輸的時候常常會面臨黏包/拆包問題,netty自帶了不少通用的TCP黏包/拆包解決方案,下面咱們看看rocketmq如何藉助netty來實現編解碼:NettyEncoder編碼、NettyDecoder解碼,rocketmq相關的網絡協議上面內容已經說明過了。
NettyEncoder編碼
NettyDecoder解碼
netty中針對這四種場景均有對應的解碼器做爲解決方案,好比:
rocketmq中使用的就是基於LengthFieldBasedFrameDecoder自定義長度解碼器的。
IdleStateHandler:Netty自帶的心跳檢測。
NettyConnetManageHandle:主要就是連接管理,新鏈接、鏈接斷開、異常、Idle等事件,每一個事件過來存入NettyEventExecuter的隊列裏面。
NettyEventExecutor的run方法會不斷的從隊列裏面取事件進行相應的處理:
NettyServerHandler:具體業務處理(後續會說到)。
invokeSync(同步調用)進行說明:
opaque就至關與標識的這個請求,雖然rpc調用請求發送結束了,可是響應回來的時候仍是會帶有該信息就能夠判斷出是原來那個請求,好比響應回來以後執行原來給定的回調等。
經過countDownLatch來控制等待網絡通訊時間 :
invokeAsync(異步調用)進行說明:
與invokeSync(同步調用)基本相似,boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);//控制異步請求的個數以及超時和使用使用布爾原子變量,信號量保證只釋放一次,對於異步invokeCallback不爲空,須要進行調用。invokeOneway(單向調用)比較簡單略過。
下面看看消息接收處理:
備註:這裏判斷是request仍是response都是經過header裏面的flag標記來判斷的,上面已經說明。
processResponseCommand在介紹上面三種發送的時候說過了,下面重點看看processRequestCommand:
備註:這裏須要作流控,要求線程池對應的隊列必須是有大小限制的,是經過線程池進行限流的。
RocketMQ原理介紹V3.1.1
netty源碼分析之LengthFieldBasedFrameDecoder
若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【匠心零度】,查閱更多精彩歷史!!!
加入知識星球,一塊兒探討!