此次源碼學習的方法是帶着問題學習源碼實現,問題列表以下數據庫
Producer 同步消息怎麼發送?服務器
Producer 是與NameServer什麼交互?負載均衡
Producer 異步消息怎麼發送?dom
P2P,Pub/Sub 都支持嗎?異步
Producer 怎麼保證順序消息?socket
Producer 負載均衡?分佈式
Producer Group是什麼概念?ide
Producer 怎麼制定消息優先級?函數
Producer 的事務消息怎麼實現?工具
Producer 定時消息怎麼實現?
//主題 private String topic; //狀態 private int flag; //屬性 private Map<String, String> properties; //消息體 private byte[] body;
1. 查找topic中的發佈信息 2. 選擇topic中的某個MessageQueue 3. 使用Netty發送消息至MessageQueue
DefaultMQProducer.send ->DefaultMQProducerImple.send ->DefaultMQProducerImple.sendDefaultImpl
private SendResult sendDefaultImpl(// Message msg, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final long timeout// ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //獲取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //選擇topic中的中的某個MessageQueue(至關於kafka的partition) MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); //真正的發送數據方法 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } } }
本次重點關注消息體的發送
DefaultMQProducerImple.sendDefaultImpl->DefaultMQProducerImpl.sendKernelImpl
private SendResult sendKernelImpl(final Message msg, // final MessageQueue mq, // final CommunicationMode communicationMode, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } int sysFlag = 0; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } //構建請求頭部 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { //異步模式發送 case ASYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(// brokerAddr, // 1 mq.getBrokerName(), // 2 msg, // 3 requestHeader, // 4 timeout, // 5 communicationMode, // 6 sendCallback, // 7 topicPublishInfo, // 8 this.mQClientFactory, // 9 this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10 context, // this); break; case ONEWAY: //同步模式發送 case SYNC: sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
後面就是底層的消息發送工具,整個RocketMQ共用
DefaultMQProducerImpl.sendKernelImpl->MQClientAPIImpl.sendMessage
public SendResult sendMessage(// final String addr, // 1 final String brokerName, // 2 final Message msg, // 3 final SendMessageRequestHeader requestHeader, // 4 final long timeoutMillis, // 5 final CommunicationMode communicationMode, // 6 final SendCallback sendCallback, // 7 final TopicPublishInfo topicPublishInfo, // 8 final MQClientInstance instance, // 9 final int retryTimesWhenSendFailed, // 10 final SendMessageContext context, // 11 final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); default: assert false; break; } return null; }
MQClientAPIImpl.sendMessage->MQClientAPIImpl.sendMessageSync
委託給Netty
private SendResult sendMessageSync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; return this.processSendResponse(brokerName, msg, response); }
MQClientAPIImpl.sendMessageSync->NettyRomotingClient.invokeSync
在調用先後觸發hook,真正的還在後面
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
NettyRomotingClient.invokeSync->NettyRomotingClient.invokeSyncImpl
終於到了Netty真正發數據的時候了
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null); this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); PLOG.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
private boolean orderTopic = false; private boolean haveTopicRouterInfo = false; //topic中MessgeQueuee信息 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); //負責均衡策略的索引 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //topic包含的路由信息 private TopicRouteData topicRouteData;
1. 發送消息時查找topic中的發佈信息 2. 若是沒找到,經過NameServer去尋找 3. 經過Netty請求NameServer
DefaultMQProducerImpl
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //從NameServer更新 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
MQClientInstance
public boolean updateTopicRouteInfoFromNameServer(final String topic) { return updateTopicRouteInfoFromNameServer(topic, false, null); }
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); } }
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); //構建向NameServer的消息體 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader); //Netty 請求 RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.TOPIC_NOT_EXIST: { // TODO LOG break; } case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { return TopicRouteData.decode(body, TopicRouteData.class); } } default: break; } throw new MQClientException(response.getCode(), response.getRemark()); }
異步:
異步與同步的區別,是無需等待,後續邏輯透過回調.
異步發送方法參數多了SendCallback參數,且沒有返回值
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { this.defaultMQProducerImpl.send(msg, sendCallback); }
SendCallback是發送成功和失敗後的回調函數
public interface SendCallback { public void onSuccess(final SendResult sendResult); public void onException(final Throwable e); }
前面的和同步相似都跳過,直接看
private void sendMessageAsync(// final String addr, // final String brokerName, // final Message msg, // final long timeoutMillis, // final RemotingCommand request, // final SendCallback sendCallback, // final TopicPublishInfo topicPublishInfo, // final MQClientInstance instance, // final int retryTimesWhenSendFailed, // final AtomicInteger times, // final SendMessageContext context, // final DefaultMQProducerImpl producer // ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (null == sendCallback && response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); if (context != null && sendResult != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } } catch (Throwable e) { // } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); return; } if (response != null) { try { SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response); assert sendResult != null; if (context != null) { context.setSendResult(sendResult); context.getProducer().executeSendMessageHookAfter(context); } // 上面重複了,sendCallBack的判斷應該放在這 try { // 異步成功回調點 sendCallback.onSuccess(sendResult); } catch (Throwable e) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false); } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); //異常回調點 onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); }
@Override public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final int opaque = request.getOpaque(); boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once); this.responseTable.put(opaque, responseFuture); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseFuture.putResponse(null); responseTable.remove(opaque); try { //執行回調 executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { responseFuture.release(); } PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel)); } }); } catch (Exception e) { responseFuture.release(); PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast"); } else { String info = String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", // timeoutMillis, // this.semaphoreAsync.getQueueLength(), // this.semaphoreAsync.availablePermits()// ); PLOG.warn(info); throw new RemotingTimeoutException(info); } } }
JMS系統有P2P和Pub/Sub兩種模式,一條消息只能被消費一次時即爲P2P,RocketMQ的廣播模式即爲了Pub/Sub模式.
順序消息:
消費消息的順序要同發送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息爲知足順序性,必須 Producer 單線程順序發送,且發送到同一個隊列,這樣 Consumer 就能夠按照 Producer 發送的順序去消費消息。
普通順序消息:
順序消息的一種,正常狀況下能夠保證徹底的順序消息,可是一旦發生通訊異常,Broker 重啓,因爲隊列總數發生變化,哈希取模後定位的隊列會變化,產生短暫的消息順序不一致。 若是業務能容忍在集羣異常狀況(如某個 Broker 宕機或者重啓)下,消息短暫的亂序,使用普通順序方式比較合適。
嚴格順序消息:
順序消息的一種,不管正常異常狀況都能保證順序,可是犧牲了分佈式 Failover 特性,即 Broker 集羣中只 要有一臺機器不可用,則整個集羣都不可用,服務可用性大大下降。 若是服務器部署爲同步雙寫模式,此缺陷可經過備機自動切換爲主避免,不過仍然會存在幾分鐘的服務不可用。(依賴同步雙寫,主備自動切換,自動切換功能目前還未實現)目前已知的應用只有數據庫binlog同步強依賴嚴格順序消息,其餘應用絕大部分均可以容忍短暫亂序,推薦使用普通的順序消息。
要實現嚴格的順序消息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是一對一對一的關係
關於順序消息建議閱讀分佈式開放消息系統(RocketMQ)的原理與實踐
放在Producer就是要保證須要嚴格順序消息就是把同類型的消息發送到同一MessageQueue,經過實現特定的MessageQueueSelector,能夠定製跟消息體相關的MessageQueue
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, selector, arg, this.defaultMQProducer.getSendMsgTimeout()); }
public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg); }
同步發送消息中,有這樣一個方法來選定MessageQueue
MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
保證低延遲
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //若是要保證最低發送延遲走此邏輯 //建議:這裏改爲策略模式 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); //輪詢看能不能有低延遲的MessageQueue for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } //若是沒找到就隨機找一個 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //否則走此邏輯,輪詢模式 return tpInfo.selectOneMessageQueue(lastBrokerName); }
選最低延遲的broker,根據FaultItem的可用性和開始時間選擇
@Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); //按延遲排序 Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { //從最爛的後一個開始選 final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
FaultItem排序策略
@Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; }
輪詢模式:
輪詢取不等於lastBrokerName的一個MessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
沒有lastBrokerName,就輪詢取吧
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
一類 Producer 的集合名稱,這類Producer一般發送一類消息,且發送邏輯一致。
優先級:
每條消息都有不一樣的優先級,通常用整數來描述,優先級高的消 息先投遞,若是消息徹底在一個內存隊列中,那麼在投遞前能夠按照優先級排序,令優先級高的先投遞。因爲RocketMQ全部消息都是持久化的,因此若是按照優先級來排序,開銷會很是大,所以RocketMQ沒有特地支持消息優先級,可是能夠經過變通的方式實現相似功能,即單獨配置一個優先級高的隊列,和一個普通優先級 的隊列, 將不一樣優先級發送到不一樣隊列便可。
對於優先級問題,能夠概括爲 2 類
只要達到優先級目的便可,不是嚴格意義上的優先級,一般將優先級劃分爲高、中、低,或者再多幾個級別。每一個優先級能夠用不一樣的topic表示,發消息時,指定不一樣的topic來表示優先級,這種方式能夠解決絕大部分的優先級問題,可是對業務的優先級精確性作了妥協。
嚴格的優先級,優先級用整數表示,例如0~65535,這種優先級問題通常使用不一樣 topic 解決就很是不合適。若是要讓 MQ 解決此問題,會對 MQ 的性能形成很是大的影響。這裏要確保一點,業務上是否確實須要這種嚴格的優先級,若是將優先級壓縮成幾個,對業務的影響有多大?
邏輯:
1. 發送prepare消息 2. 執行生產者業務邏輯(業務邏輯代碼需實現LocalTransactionExecuter) 3. 結束事務消息(commit,rollback,notType)
TransactionMQProducer.sendMessageInTransaction->DefaultMQProducerImpl.sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == tranExecuter) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 設置事務消息屬性 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { 1. 發送消息,同普通消息同樣 sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } //執行生產者業務邏輯 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: //若是1就發送失敗,就回滾 case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } try { //結束事務狀態,by本地業務邏輯執行狀況 this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
public void endTransaction(// final SendResult sendResult, // final LocalTransactionState localTransactionState, // final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { //若是本地執行成功就提交 case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; //若是本地失敗,就回滾 case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; //若是本地未知,那就打死未知 case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
特殊狀況:若是事務結束消息發送失敗,該怎麼辦?
broker會檢查若是是prepared狀態,則會向Producer發起CheckTransaction請求.
定時消息:
指消息發到 Broker 後,不能馬上被 Consumer 消費,要到特定的時間點或者等待特定的時間後才能 被消費。
若是要支持任意的時間精度,在Broker層面,必需要作消息排序,若是再涉及到持久化,那麼消息排序要不 可避免的產生巨大性能開銷。
RocketMQ 支持定時消息,可是不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。
實現:
在消息體中設置Message延遲級別,剩下的就交給Broker實現吧
public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }