本文主要研究一下rocketmq的sendMsgTimeoutjava
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.javagit
public class DefaultMQProducer extends ClientConfig implements MQProducer { private final InternalLogger log = ClientLogger.getLog(); //...... /** * Timeout for sending messages. */ private int sendMsgTimeout = 3000; public int getSendMsgTimeout() { return sendMsgTimeout; } public void setSendMsgTimeout(int sendMsgTimeout) { this.sendMsgTimeout = sendMsgTimeout; } //...... }
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javagithub
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; //...... private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); } //...... }
beginTimestampPrev - beginTimestampFirst
),若是是則設置timeout爲true,跳出循環,不是則把timeout - costTime做爲新的timeout參數調用sendKernelImpl方法rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javaapache
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; //...... private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } //...... }
System.currentTimeMillis() - beginStartTime
),若是是則拋出RemotingTooMuchRequestException異常,不然使用timeout - costTimeAsync做爲新的timeout參數調用mQClientFactory.getMQClientAPIImpl().sendMessage方法rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/MQClientAPIImpl.javadom
public class MQClientAPIImpl { private final static InternalLogger log = ClientLogger.getLog(); private static boolean sendSmartMsg = Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); static { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); } private final RemotingClient remotingClient; private final TopAddressing topAddressing; private final ClientRemotingProcessor clientRemotingProcessor; private String nameSrvAddr = null; private ClientConfig clientConfig; //...... public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; } //...... }
rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/netty/NettyRemotingClient.javaasync
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { once.release(); throw new RemotingTimeoutException("invokeAsyncImpl call timeout"); } final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } requestFail(opaque); log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } } //...... }
sendMessage方法最後經過sendMessageSync、sendMessageAsync等方法最後調用remotingClient.invokeSync或remotingClient.invokeAsync,並傳遞timeoutMillis;以invokeSyncImpl爲例,這裏是經過responseFuture.waitResponse(timeoutMillis)方法來實現的timeoutide