本文主要研究一下sentinel的FlowSlotjava
com/alibaba/csp/sentinel/slots/block/flow/FlowSlot.javanode
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { FlowRuleManager.checkFlow(resourceWrapper, context, node, count); fireEntry(context, resourceWrapper, node, count, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { fireExit(context, resourceWrapper, count, args); } }
com/alibaba/csp/sentinel/slots/block/flow/FlowRuleManager.javagit
public static void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { List<FlowRule> rules = flowRules.get(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!rule.passCheck(context, node, count)) { throw new FlowException(rule.getLimitApp()); } } } }
com/alibaba/csp/sentinel/slots/block/flow/FlowRule.javagithub
/*** * <p> * Each flow rule is mainly composed of three factors: <strong>grade</strong>, * <strong>strategy</strong> and <strong>controlBehavior</strong>. * </p> * <ul> * <li>The {@link #grade} represents the threshold type of flow control (by QPS or thread count).</li> * <li>The {@link #strategy} represents the strategy based on invocation relation.</li> * <li>The {@link #controlBehavior} represents the QPS shaping behavior (actions on incoming request when QPS * exceeds the threshold).</li> * </ul> * * @author jialiang.linjl * @author Eric Zhao */ public class FlowRule extends AbstractRule { public static final String LIMIT_APP_DEFAULT = "default"; public static final String LIMIT_APP_OTHER = "other"; public FlowRule(){ super(); setLimitApp(LIMIT_APP_DEFAULT); } /** * The threshold type of flow control (0: thread count, 1: QPS). */ private int grade = RuleConstant.FLOW_GRADE_QPS; /** * Flow control threshold count. */ private double count; /** * Flow control strategy based on invocation chain. * * {@link RuleConstant#STRATEGY_DIRECT} for direct flow control (by origin); * {@link RuleConstant#STRATEGY_RELATE} for relevant flow control (with relevant resource); * {@link RuleConstant#STRATEGY_CHAIN} for chain flow control (by entrance resource). */ private int strategy = RuleConstant.STRATEGY_DIRECT; /** * Reference resource in flow control with relevant resource. */ private String refResource; /** * Rate limiter control behavior. * 0. default, 1. warm up, 2. rate limiter */ private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; private int warmUpPeriodSec = 10; /** * Max queueing time in rate limiter behavior. */ private int maxQueueingTimeMs = 500; private Controller controller; //...... @Override public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { String limitApp = this.getLimitApp(); if (limitApp == null) { return true; } String origin = context.getOrigin(); Node selectedNode = selectNodeByRequesterAndStrategy(origin, context, node); if (selectedNode == null) { return true; } return controller.canPass(selectedNode, acquireCount); } private Node selectNodeByRequesterAndStrategy(String origin, Context context, DefaultNode node) { // The limit app should not be empty. String limitApp = this.getLimitApp(); if (limitApp.equals(origin)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } String refResource = this.getRefResource(); if (StringUtil.isEmpty(refResource)) { return null; } if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } } else if (LIMIT_APP_DEFAULT.equals(limitApp)) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return node.getClusterNode(); } String refResource = this.getRefResource(); if (StringUtil.isEmpty(refResource)) { return null; } if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } return node; } } else if (LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, getResource())) { if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } String refResource = this.getRefResource(); if (StringUtil.isEmpty(refResource)) { return null; } if (strategy == RuleConstant.STRATEGY_RELATE) { return ClusterBuilderSlot.getClusterNode(refResource); } if (strategy == RuleConstant.STRATEGY_CHAIN) { if (!refResource.equals(context.getName())) { return null; } if (node != null) { return node; } } } return null; } //...... }
com/alibaba/csp/sentinel/slots/block/flow/controller/DefaultController.javaapi
public class DefaultController implements Controller { double count = 0; int grade = 0; public DefaultController(double count, int grade) { super(); this.count = count; this.grade = grade; } @Override public boolean canPass(Node node, int acquireCount) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { return false; } return true; } private int avgUsedTokens(Node node) { if (node == null) { return -1; } return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)node.passQps(); } }