本文主要研究一下rocketmq的LatencyFaultTolerancejava
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultTolerance.javagit
public interface LatencyFaultTolerance<T> { void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); boolean isAvailable(final T name); void remove(final T name); T pickOneAtLeast(); }
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.javagithub
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } @Override public String pickOneAtLeast() { final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; } @Override public String toString() { return "LatencyFaultToleranceImpl{" + "faultItemTable=" + faultItemTable + ", whichItemWorst=" + whichItemWorst + '}'; } //...... }
tmpList.size() / 2
),若half小於等於0則返回tmpList.get(0).getName(),不然取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half
計算而來rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.javaapache
class FaultItem implements Comparable<FaultItem> { private final String name; private volatile long currentLatency; private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; } @Override public int hashCode() { int result = getName() != null ? getName().hashCode() : 0; result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); return result; } @Override public boolean equals(final Object o) { if (this == o) return true; if (!(o instanceof FaultItem)) return false; final FaultItem faultItem = (FaultItem) o; if (getCurrentLatency() != faultItem.getCurrentLatency()) return false; if (getStartTimestamp() != faultItem.getStartTimestamp()) return false; return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; } @Override public String toString() { return "FaultItem{" + "name='" + name + '\'' + ", currentLatency=" + currentLatency + ", startTimestamp=" + startTimestamp + '}'; } public String getName() { return name; } public long getCurrentLatency() { return currentLatency; } public void setCurrentLatency(final long currentLatency) { this.currentLatency = currentLatency; } public long getStartTimestamp() { return startTimestamp; } public void setStartTimestamp(final long startTimestamp) { this.startTimestamp = startTimestamp; } }
(System.currentTimeMillis() - startTimestamp) >= 0
;其compareTo方法依次根據isAvailable()、currentLatency、startTimestamp來排序rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/common/ThreadLocalIndex.javadom
public class ThreadLocalIndex { private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } @Override public String toString() { return "ThreadLocalIndex{" + "threadLocalIndex=" + threadLocalIndex.get() + '}'; } }
tmpList.size() / 2
),若half小於等於0則返回tmpList.get(0).getName(),不然取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half
計算而來