RocketMQ入門篇
RocketMQ生產者流程篇
RocketMQ生產者消息篇git
生產者向消息隊列裏面寫入消息,不一樣的業務場景會採用不一樣的寫入策略,好比:同步發送,異步發送,延遲發送,事務消息等;本文首先從分析生產者發送消息的流程開始,而後再來介紹各類發送消息的策略。github
生產者首先須要設置namesrv,或者指定其餘方式更新namesrv;而後從namesrv獲取topic的路由信息,路由信息包括broker以及Message Queue等信息,同時將路由信息保存在本地內存中,方便下次使用;最後從Message Queue列表中選擇合適的Queue發送消息,實現負載均衡;apache
DefaultMQProducer實例化提供了兩個參數分別是:生產者組名稱以及RPCHook,RPCHook是一個接口,具體實現交由業務端實現,兩個方法分別是:doBeforeRequest和doAfterResponse,表示在執行請求以前和接收返回以後分別執行相關邏輯;
接下來就是調用DefaultMQProducer的start方法,相關的初始化操做都在裏面進行,內部其實調用的是DefaultMQProducerImpl的start方法,具體代碼以下:segmentfault
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
默認serviceState的狀態爲CREATE_JUST,剛進入設置爲START_FAILED,初始化完成以後設置爲RUNNING,再次初始化時會直接報錯,下面看一下具體初始化了哪些信息;數組
這裏的檢查其實就是對producerGroup進行合法性校驗;負載均衡
若是producerGroup不等於默認的"CLIENT_INNER_PRODUCER",則設置DefaultMQProducer的instanceName爲進程的pid;框架
首先檢查 ConcurrentMap<String/ clientId /, MQClientInstance> factoryTable中是否已經存在已clientId爲key的MQClientInstance,若是存在則取出,不存在則實例化;clientId是已ip地址,instanceName以及unitName組成的,例如:10.13.83.7@12500dom
將DefaultMQProducerImpl註冊到MQClientInstance中,已producerGroup做爲key,註冊到ConcurrentMap<String/ group /, MQProducerInner> producerTable中,若是已經存在此Group,則拋出異常;異步
已topic名稱爲"TBW102"爲key,實例化TopicPublishInfo做爲value,存放在ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中,TopicPublishInfo用來存放topic的路由信息;函數
MQClientInstance啓動會啓動不少相關服務,具體能夠看以下代碼:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
默認serviceState的狀態爲CREATE_JUST,剛進入設置爲START_FAILED,初始化完成以後設置爲RUNNING,防止重複初始化;
首先判斷DefaultMQProducer是否配置了NameServerAddr,若是沒有配置會到一個地址下獲取,地址默認爲:http://jmenv.tbsite.net:8080/rocketmq/nsaddr,相關的邏輯在MixAll類中,代碼以下:
public static String getWSAddr() { String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP); String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup; if (wsDomainName.indexOf(":") > 0) { wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup; } return wsAddr; }
正常狀況下咱們須要設置本身的地址,能夠經過以下方式設置:
System.setProperty("rocketmq.namesrv.domain", "localhost");
這種狀況下就能夠不用手動設置NameServerAddr;
RemotingClient是一個接口類,底層使用的通信框架是Netty,提供了實現類NettyRemotingClient,RemotingClient在初始化的時候實例化Bootstrap,方便後續用來建立Channel;
總共啓動了5個定時器任務,分別是:定時更新NameServerAddr信息,定時更新topic的路由信息,定時清理下線的broker,定時持久化Consumer的Offset信息,定時調整線程池;
pullMessageService和rebalanceService被用在消費端的兩個服務類,分別是:從broker拉取消息的服務和均衡消息隊列服務,負責分配消費者可消費的消息隊列;
相關發送消息的代碼在DefaultMQProducerImpl的sendDefaultImpl方法中,部分代碼以下所示:
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } ...如下代碼省略...
此方法的四個參數分別是:msg爲要發送的消息,communicationMode要使用的發送方式包括同步、異步、單向,sendCallback在異步狀況下的回調函數,timeout發送消息的超時時間;
經過msg中設置的topic獲取路由信息,具體代碼以下:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
首先從變量ConcurrentMap<String/ topic /, TopicPublishInfo> topicPublishInfoTable中獲取是否存在指定topic的路由信息,若是獲取不到則使用topic去nameServer獲取路由信息,若是仍是獲取不到則使用默認的topic名稱爲"TBW102"去獲取路由信息,須要使用默認名稱去獲取的狀況是沒有建立topic,須要broker自動建立topic的狀況;獲取路由信息使用的是MQClientInstance中的updateTopicRouteInfoFromNameServer方法,此方法根據topic獲取路由信息,具體鏈接哪臺nameServer,會從列表中順序的選擇nameServer,實現負載均衡;
注:名稱爲"TBW102"的topic是系統自動建立的;
成功獲取到路由信息以後,須要從MessageQueue列表中選擇一個,在這以前獲取了默認發送消息失敗的重試次數,默認爲3次(只有發送模式是同步的狀況下),代碼以下:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
以上代碼在MQFaultStrategy,此類提供了選擇MessageQueue的策略,相關策略能夠看類變量:
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
latencyFaultTolerance:延遲容錯對象,維護brokers的延遲信息;
sendLatencyFaultEnable:延遲容錯開關,默認不開啓;
latencyMax:延遲級別數組;
notAvailableDuration :根據延遲級別,對應broker不可用的時長;
這樣上面的這段代碼就好理解了,首先斷定是否開啓開關,若是開啓首先獲取index,index初始爲一個隨機值,後面每次+1,這樣在後面與MessageQueue長度取模的時候每一個MessageQueue能夠按順序獲取;獲取
MessageQueue以後須要斷定其對應的Broker是否可用,同時也須要和當前指定的brokerName進行匹配;若是沒有獲取到就選擇一個延遲相對小的,pickOneAtLeast會作排序處理;若是都不行就直接獲取一個MessageQueue,無論其餘條件了;
在發送以前會作超時檢測,若是前面兩步已經超時了,則直接報超時,默認超時時間是3秒;部分代碼以下:
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); msg.setBody(prevBody); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult;
此處的6個參數分別是:msg消息自己,mq要發送到的隊列,communicationMode發送策略,sendCallback異步回調函數,topicPublishInfo路由信息,timeout發送超時時間(3秒-前2步消耗的時間);
首先須要獲取指定broker的地址,這要才能建立channel與broker鏈接;而後就是一些hock處理;接下來就是準備發送的消息頭SendMessageRequestHeader,最後根據不一樣的發送策略執行發送消息,此處就不在進入更加深刻的分析;
本文重點介紹了發送者的啓動,以及發送消息的大概流程;關於消息的發送策略,以及消息的類型包括:順序消息,事務消息等,將在後面的文章介紹。