本文較長,代碼後面給了方法簡圖,但願給你幫助git
同步發送github
異步發送apache
普通消息bootstrap
順序消息數組
事務消息緩存
爲了防止讀者朋友嫌煩,能夠看下時序圖,後面我也會給出方法的簡圖框架
dom
異步
socket
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer1"); producer.start(); for (int i = 0; i < 10; i++) try { { Message msg = new Message("TopicTest", "TagA", "OrderID188", ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // send方法 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout // + 用戶來處理異常 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // + 確保服務狀態是RUNNING this.makeSureStateOK(); // + 傳參判空 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; /** * 根據topic的name,從本地獲取tocip信息,若是本地沒有就從nameserver中取,同時緩存到本地 * 包括MessageQueueList, brokeName, topic_name */ TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; /** * 生產端的重試:異步方式最大執行次數總共3次,同步1次, * 重試針對的是brokeException\MQClientException\RemotingException\返回值失敗 */ int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); /** * 根據topic和broke選擇1個隊列 * 選擇策略,產生一個隨機數,hash % broke中隊列數,而後hash+1 * 這個隨機數:是線程私有的 */ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; // 在重試數組中放入broke_name brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { // Reset topic with namespace during resend. // 重置topic msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 超時就break,拋出call timeout異常,這時尚未,經過socket重試 long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 【核心,以下】調用sendKernelImpl方法,想選中的messageQueu中投遞消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); // 獲取當前時間,控制全部步驟的時間,不超過用戶設置的超時時間,或默認超時時間 endTimestamp = System.currentTimeMillis(); // 把這個操做時間記錄到map中,key=brokerName, value=對象[包括:brokeName, currentLatency當前操做花費時間,startTimestamp開始的時間] this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 當開啓了重試另一個broke時,纔會失敗重試 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // 遠程調用時異常,會重試 endTimestamp = System.currentTimeMillis(); // 記錄操做時間 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); // 記錄操做時間 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); // 記錄操做時間 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: // topic不存在 case ResponseCode.SERVICE_NOT_AVAILABLE: // 服務不可用 case ResponseCode.SYSTEM_ERROR: // 系統錯誤 case ResponseCode.NO_PERMISSION: // 無權限 case ResponseCode.NO_BUYER_ID: // case ResponseCode.NOT_IN_CURRENT_UNIT: // 不在集羣中 continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); // 記錄操做時間 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 獲取broker的IP地址,獲取到的是主broker String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { // 使用topic的name去獲取topic,若是本地沒有,則在從nameserver中獲取,同時也更新broker的信息 tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { // 若開啓了vipchannel,broke的端口減2 brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { // 設置UNIQ_KEY MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } // 嘗試壓縮消息,有必定的條件,不是MessageBatch,消息超過4k int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } // 判斷是不是事務消息 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 禁用鉤子 todo 疑問待解=> if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } // 發送消息鉤子 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); // 事務消息消息類型是半消息 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } // 記錄消息軌跡 this.executeSendMessageHookBefore(context); } // 構建請求頭 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); // 更新 %RETRY%重試topic裏消息的消費時間和 最大消費次數 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { // 異步 case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. 若是消息體被壓縮了,應該用prevBody重置msgBody //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } // 防止發送以前的處理超時 long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 下面仔細說明這個方法 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; // 直接發送,不關心發送結果/同步消息 case ONEWAY: case SYNC: // 防止發送以前的處理超時 long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // 下面仔細說明這個方法 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
this.mQClientFactory.getMQClientAPIImpl().sendMessage()
內部調用
異步消息調用的是 NettyRemotingClient.invokeAsync方法
同步消息調用的是 NettyRemotingClient.invokeSync方法
下面跟着代碼查看invokeSync方法
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); // 建立channal,若是channel爲空,使用 this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));來建立channel final Channel channel = this.getAndCreateChannel(addr); // 活躍的channel才能發消息 if (channel != null && channel.isActive()) { try { // rpc鑑權 doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); // 使用writeAndFlush發請求,創建返回值的監聽,這是netty-client發送消息,還有netty-server收消息 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); // 執行成功後return, 跳出這個監聽 return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // 使用countDownLatch掛起線程,等待收到netty的返回值 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); // 在clinet的channelRead0將responseCommand賦值,若爲空,說明超過必定時間還未獲取返回值,這時拋出異常,交由用戶處理 // 可能netty-server收到消息,也有可能沒有收到消息,不肯定 if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
剛開始開發送的代碼,以爲太長了,不想看了,如今想一想他的邏輯挺清晰的,可以幫助你看清別人的開源框架整個構建構成
====== 【多學一點,for Better】======