本文主要研究一下rocketmq的MQFaultStrategyjava
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.javagit
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } }
Math.abs(index++) % tpInfo.getMessageQueueList().size()
),若小於0則重置爲0;而後使用latencyFaultTolerance.isAvailable來判斷是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)則返回該MessageQueuetpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
);若writeQueueNums小於等於0則執行latencyFaultTolerance.remove(notBestBroker);若是前面沒有選出MessageQueue則最後使用tpInfo.selectOneMessageQueue()currentLatency >= latencyMax[i]
時返回notAvailableDuration[i],不然最後返回0rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.javagithub
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private final Timer timer = new Timer("RequestHouseKeepingService", true); protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private ExecutorService asyncSenderExecutor; //...... public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); } //...... }
Math.abs(index++) % tpInfo.getMessageQueueList().size()
),若小於0則重置爲0;而後使用latencyFaultTolerance.isAvailable來判斷是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)則返回該MessageQueuetpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
);若writeQueueNums小於等於0則執行latencyFaultTolerance.remove(notBestBroker);若是前面沒有選出MessageQueue則最後使用tpInfo.selectOneMessageQueue()currentLatency >= latencyMax[i]
時返回notAvailableDuration[i],不然最後返回0