本文主要研究一下sentinel的StatisticSlotjava
com/alibaba/csp/sentinel/slots/statistic/StatisticSlot.javanode
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { try { fireEntry(context, resourceWrapper, node, count, args); node.increaseThreadNum(); node.addPassRequest(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(); } if (resourceWrapper.getType() == EntryType.IN) { Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(); } } catch (BlockException e) { context.getCurEntry().setError(e); // Add block count. node.increaseBlockedQps(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseBlockedQps(); } if (resourceWrapper.getType() == EntryType.IN) { Constants.ENTRY_NODE.increaseBlockedQps(); } throw e; } catch (Throwable e) { context.getCurEntry().setError(e); // Should not happen node.increaseExceptionQps(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().increaseExceptionQps(); } if (resourceWrapper.getType() == EntryType.IN) { Constants.ENTRY_NODE.increaseExceptionQps(); } throw e; } } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { DefaultNode node = (DefaultNode)context.getCurNode(); if (context.getCurEntry().getError() == null) { long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime(); if (rt > Constants.TIME_DROP_VALVE) { rt = Constants.TIME_DROP_VALVE; } node.rt(rt); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().rt(rt); } node.decreaseThreadNum(); if (context.getCurEntry().getOriginNode() != null) { context.getCurEntry().getOriginNode().decreaseThreadNum(); } if (resourceWrapper.getType() == EntryType.IN) { Constants.ENTRY_NODE.rt(rt); Constants.ENTRY_NODE.decreaseThreadNum(); } } else { // error may happen // node.rt(-2); } fireExit(context, resourceWrapper, count); } }
com/alibaba/csp/sentinel/node/StatisticNode.javagit
public class StatisticNode implements Node { private transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60); private AtomicInteger curThreadNum = new AtomicInteger(0); private long lastFetchTime = -1; //...... @Override public int curThreadNum() { return curThreadNum.get(); } @Override public void addPassRequest() { rollingCounterInSecond.addPass(); rollingCounterInMinute.addPass(); } @Override public void increaseBlockedQps() { rollingCounterInSecond.addBlock(); rollingCounterInMinute.addBlock(); } @Override public void increaseExceptionQps() { rollingCounterInSecond.addException(); rollingCounterInMinute.addException(); } @Override public void increaseThreadNum() { curThreadNum.incrementAndGet(); } @Override public void decreaseThreadNum() { curThreadNum.decrementAndGet(); } }
com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.javagithub
public class DefaultController implements Controller { double count = 0; int grade = 0; public DefaultController(double count, int grade) { super(); this.count = count; this.grade = grade; } @Override public boolean canPass(Node node, int acquireCount) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return -1; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps(); } }
com/alibaba/csp/sentinel/slots/DefaultSlotsChainBuilder.javaapp
public class DefaultSlotsChainBuilder implements SlotsChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
StatisticSlot雖然在SystemSlot、SystemSlot、AuthoritySlot、FlowSlot前面,可是其遞增統計數則是fireEntry動做以後進行的,至關於aop攔截的after,也能夠將Node的curThreadNum理解爲經過限流條件以後的請求線程數ide