Sentinel核心流程源码解读
Sentinel整体架构如下
图里从下往上可以看到,核心的部分包含 规则(rules)
、 处理插槽(slot)
、 调用链路(invocation tree)
、 集群节点(cluster node)
、 滑动窗口(slading winodw)
5部分。
Sentinel代码使用 tag 1.8.6 ,对应github链接 https://github.com/alibaba/Sentinel/tree/1.8.6
(1) 核心流程源码解读
核心源码包含以下几个部分
1.规则(rules)-限流规则/熔断规则
2.构建功能插槽(solt)责任链
3.调用链路(invocation tree)
4.集群节点(cluster node)
5.滑动窗口(slading winodw)
public class SimpleDemo {
public static void main(String[] args) {
// 配置规则.
initFlowRules();
//while (true) {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保护的逻辑
System.out.println("hello world");
} catch (BlockException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
//}
}
private static void initFlowRules() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 2.
rule.setCount(2);
rules.add(rule);
//
FlowRuleManager.loadRules(rules);
}
}
(2) 加载限流规则
资源对应限流规则
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 2.
rule.setCount(2);
rules.add(rule);
//
FlowRuleManager.loadRules(rules);
(3) 构建功能插槽(ProcessSolt)责任链
这块对应的代码是 SlotChainProvider.newSlotChain();
在构建功能插槽的时候使用责任链设计模式和使用SPI提高扩展性
构建完的责任链类似这种
整体上的代码如下
package com.alibaba.csp.sentinel.slotchain;
/**
* A provider for creating slot chains via resolved slot chain builder SPI.
*/
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* The load and pick process is not thread-safe, but it's okay since the method should be only invoked
* via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock.
*
* @return new created slot chain
*/
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 使用SPI构建插槽实例
// 这块通过SPI读取配置文件加载类
// 读取的配置文件 META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
// slotChainBuilder=com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// 不应该走到这儿 走到这儿肯定有问题
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
// 构建功能插槽责任链
return slotChainBuilder.build();
}
}
这儿比较重要的方法有两个,一个是 SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault()
,另一个是 slotChainBuilder.build()
(3.1) 构造默认的slotChainBuilder
package com.alibaba.csp.sentinel.spi;
public final class SpiLoader<S> {
/**
* Load the first-found Provider instance,if not found, return default Provider instance
*
* @return Provider instance
*/
public S loadFirstInstanceOrDefault() {
// 使用SPI读取配置文件获取全限定类目,并通过ClassLoader加载class
load();
for (Class<? extends S> clazz : classList) {
if (defaultClass == null || clazz != defaultClass) {
return createInstance(clazz);
}
}
// 实例化 newInstance
return loadDefaultInstance();
}
}
(3.2) 构造ProcessorSlot责任链
package com.alibaba.csp.sentinel.slots;
/**
* 默认 ProcessorSlotChain 的构建器
*
*
* Builder for a default {@link ProcessorSlotChain}.
*
*/
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
// 创建默认的 DefaultProcessorSlotChain
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 使用SPI读取配置,并通过ClassLoader加载class
// 读取的配置文件 META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
// 这儿会读取到8个 ProcessorSlot
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
// 按照字典序设置ProcessorSlot责任链
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
文件内容如下
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
可以仔细看一下,文件里的内容已经按照字典序
排序了
构建完成的功能插槽责任链类似下面这样:
对应的代码Debug截图:
(3.2.1) 默认功能插槽责任链-DefaultProcessorSlotChain
package com.alibaba.csp.sentinel.slotchain;
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// 头结点 first
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
// 省略部分代码
};
// 尾结点 end
AbstractLinkedProcessorSlot<?> end = first;
/** 添加头节点 */
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
/** 添加尾结点 */
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
first.exit(context, resourceWrapper, count, args);
}
}
(3.3) 功能插槽责任链-ProcessorSlot
执行各个功能插槽的入口是chain.entry()
这里的chain是DefaultProcessorSlotChain
,chain.entry()
会不断调用对应的entry()
方法
先来了解一下 有哪些功能插槽 及 主要作用
(3.3.1) Sentinel里的功能插槽
处理插槽 | 作用 | 备注 | 全限定类名 |
---|---|---|---|
NodeSelectorSlot | 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来 | 用于根据调用路径来限流降级 | com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot |
ClusterBuilderSlot | 负责维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表 | 这些信息将用作为多维度限流,降级的依据 | com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot |
LogSlot | 记录(BlockException)异常日志 (限流、熔断) | com.alibaba.csp.sentinel.slots.logger.LogSlot | |
StatisticSlot | 用于记录、统计不同纬度的 runtime 指标监控信息; | com.alibaba.csp.sentinel.slots.statistic.StatisticSlot | |
AuthoritySlot | 根据配置的黑白名单和调用来源信息,来做黑白名单控制; | com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot | |
SystemSlot | 通过系统的状态,例如 load1 等,来控制总的入口流量; | com.alibaba.csp.sentinel.slots.system.SystemSlot | |
FlowSlot | 用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制; | com.alibaba.csp.sentinel.slots.block.flow.FlowSlot | |
DegradeSlot | 通过统计信息以及预设的规则,来做熔断降级; | com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot |
(3.3.2) 功能插槽-ProcessorSlot
package com.alibaba.csp.sentinel.slotchain;
/**
* 一些处理的容器 和 处理完成时的通知方式。
*/
public interface ProcessorSlot<T> {
/**
* 插槽入口
*
* @param context 当前上下文
* @param resourceWrapper 当前资源
* @param param 泛型参数 {@link com.alibaba.csp.sentinel.node.Node}
* @param count 需要的令牌个数
* @param prioritized entry优先级
* @param args 原始调用的参数
* @throws Throwable blocked exception or unexpected error
*/
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
/**
* 表示entry()方法结束
*
* @param context 当前上下文
* @param resourceWrapper 当前资源
* @param obj 相关对象 (e.g. Node)
* @param count 需要的令牌个数
* @param prioritized entry优先级
* @param args 原始调用的参数
* @throws Throwable blocked exception or unexpected error
*/
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
/**
* 退出插槽
*
* @param context 当前上下文
* @param resourceWrapper 当前资源
* @param count 需要的令牌个数
* @param args 原始调用的参数
*/
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
/**
* 表示exit结束
*
* @param context 当前上下文
* @param resourceWrapper 当前资源
* @param count 需要的令牌个数
* @param args 原始调用的参数
*/
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
调用功能插槽时都是从entry方法进入
(3.4) 默认的8个功能插槽
(3.4.1) 调用链路(资源路径)插槽-NodeSelectorSlot
NodeSelectorSlot主要作用是负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
package com.alibaba.csp.sentinel.slots.nodeselector;
/**
* </p>
* This class will try to build the calling traces via
* <ol>
* <li>adding a new {@link DefaultNode} if needed as the last child in the context.
* The context's last node is the current node or the parent node of the context. </li>
* <li>setting itself to the context current node.</li>
* </ol>
* </p>
*
* <p>It works as follow:</p>
* <pre>
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
* </pre>
*
* Above code will generate the following invocation structure in memory:
*
* <pre>
*
* machine-root
* /
* /
* EntranceNode1
* /
* /
* DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);
* </pre>
*
* <p>
* Here the {@link EntranceNode} represents "entrance1" given by
* {@code ContextUtil.enter("entrance1", "appA")}.
* </p>
* <p>
* Both DefaultNode(nodeA) and ClusterNode(nodeA) holds statistics of "nodeA", which is given
* by {@code SphU.entry("nodeA")}
* </p>
* <p>
* The {@link ClusterNode} is uniquely identified by the ResourceId; the {@link DefaultNode}
* is identified by both the resource id and {@link Context}. In other words, one resource
* id will generate multiple {@link DefaultNode} for each distinct context, but only one
* {@link ClusterNode}.
* </p>
* <p>
* the following code shows one resource id in two different context:
* </p>
*
* <pre>
* ContextUtil.enter("entrance1", "appA");
* Entry nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
*
* ContextUtil.enter("entrance2", "appA");
* nodeA = SphU.entry("nodeA");
* if (nodeA != null) {
* nodeA.exit();
* }
* ContextUtil.exit();
* </pre>
*
* Above code will generate the following invocation structure in memory:
*
* <pre>
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
* </pre>
*
* <p>
* As we can see, two {@link DefaultNode} are created for "nodeA" in two context, but only one
* {@link ClusterNode} is created.
* </p>
*
* <p>
* We can also check this structure by calling: <br/>
* {@code curl http://localhost:8719/tree?type=root}
* </p>
*
* @author jialiang.linjl
* @see EntranceNode
* @see ContextUtil
*/
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* {@link DefaultNode}s of the same resource in different context.
*/
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
/*
* It's interesting that we use context name rather resource name as the map key.
*
* Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
* the same {@link ProcessorSlotChain} globally, no matter in which context. So if
* code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
* the resource name must be same but context name may not.
*
* If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
* enter same resource in different context, using context name as map key can
* distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
* of the same resource name, for every distinct context (different context name) each.
*
* Consider another question. One resource may have multiple {@link DefaultNode},
* so what is the fastest way to get total statistics of the same resource?
* The answer is all {@link DefaultNode}s with same resource name share one
* {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
*/
// 根据上下文名称获取节点
DefaultNode node = map.get(context.getName());
// 单例模式-DCL
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// 构建调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
package com.alibaba.csp.sentinel.node;
public class DefaultNode extends StatisticNode {
/**
* 孩子节点集合
*/
private volatile Set<Node> childList = new HashSet<>();
/**
* 添加孩子节点
*
* @param node valid child node
*/
public void addChild(Node node) {
if (node == null) {
RecordLog.warn("Trying to add null child to node <{}>, ignored", id.getName());
return;
}
// DCL
if (!childList.contains(node)) {
synchronized (this) {
if (!childList.contains(node)) {
// 添加孩子节点
Set<Node> newSet = new HashSet<>(childList.size() + 1);
newSet.addAll(childList);
newSet.add(node);
childList = newSet;
}
}
RecordLog.info("Add child <{}> to node <{}>", ((DefaultNode)node).id.getName(), id.getName());
}
}
}
/**
* <pre>
*
* machine-root
* / \
* / \
* EntranceNode1 EntranceNode2
* / \
* / \
* DefaultNode(nodeA) DefaultNode(nodeA)
* | |
* +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);
* </pre>
*/
(3.4.2) 节点统计信息插槽-ClusterBuilderSlot
ClusterBuilderSlot
负责维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表
package com.alibaba.csp.sentinel.slots.clusterbuilder;
/**
* 该槽维护资源运行统计信息(响应时间、qps、线程数、异常),以及调用者列表,由 ContextUtil#enter(String origin)标记
* 一个资源只有一个集群节点,而一个资源可以有多个默认节点。
*/
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
/**
* 相同的资源 ResourceWrapper#equals(Object) 将在全局范围内共享相同的ProcessorSlotChain,无论在哪个上下文中。
* 因此,如果代码进入 entry(),资源名称必须相同,但上下文名称可能不同。
*
* 为了获取同一资源在不同上下文中的总统计信息,同一资源在全局共享相同的ClusterNode。所有ClusterNode都缓存在此映射中。
*
* 应用程序运行的时间越长,这个映射就会变得越稳定。所以我们不是并发映射而是锁。因为这个锁只发生在最开始,而并发映射将一直持有锁。
*
*/
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// 单例-DCL
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 创建集群节点。
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// 添加到缓存里
newMap.put(node.getId(), clusterNode);
// 更新缓存
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* 如果设置了原始上下文,我们应该获取 或 创建原始直接的Node。
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
(3.4.3) 日志插槽-LogSlot
LogSlot主要负责记录限流异常的响应,以提供用于故障排除的具体日志。
记录的日志存储在 sentinel-block.log
文件里
package com.alibaba.csp.sentinel.slots.logger;
/**
* 一个处理插槽,它是对记录限流异常的响应,以提供用于故障排除的具体日志。
*/
@Spi(order = Constants.ORDER_LOG_SLOT)
public class LogSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
// 记录异常日志(包括限流和降级)
// 日志会记录在 sentinel-block.log 文件里
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
try {
fireExit(context, resourceWrapper, count, args);
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exit exception", e);
}
}
}
sentinel-block.log
文件内容如下:
2020-10-30 14:39:29|1|HelloWorld,FlowException,default,|46944,0
2020-10-30 14:39:30|1|HelloWorld,FlowException,default,|138183,0
2020-10-30 14:39:31|1|HelloWorld,FlowException,default,|188067,0
2020-11-25 20:12:24|1|sentinelTest,FlowException,default,|400,0
2020-11-25 20:12:25|1|sentinelTest,FlowException,default,|540,0
2020-11-25 21:00:57|1|sentinelTest,FlowException,default,|9,0
2020-11-25 21:00:58|1|sentinelTest,FlowException,default,|1,0
2020-11-25 21:00:59|1|sentinelTest,FlowException,default,|62,0
2020-11-25 21:01:00|1|sentinelTest,FlowException,default,|838,0
2020-12-10 10:39:24|1|SentinelResourceMethod1,FlowException,app1,app1|1,0
2020-12-10 10:39:25|1|SentinelResourceMethod1,FlowException,app1,app1|1,0
(3.4.4) 实时统计插槽-StatisticSlot
package com.alibaba.csp.sentinel.slots.statistic;
/**
* 专用于实时统计的处理器插槽。
* 在进入这个槽的时候,我们需要单独统计以下信息:
* ClusterNode:资源ID的集群节点的总统计。
* 源节点:来自不同调用者/源的集群节点的统计信息。
* DefaultNode:特定上下文中特定资源名称的统计信息。
* 最后是所有入口的总和统计。
*/
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
Node node = context.getCurNode();
if (context.getCurEntry().getBlockError() == null) {
// Calculate response time (use completeStatTime as the time of completion).
long completeStatTime = TimeUtil.currentTimeMillis();
context.getCurEntry().setCompleteTimestamp(completeStatTime);
long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();
Throwable error = context.getCurEntry().getError();
// Record response time and success count.
recordCompleteFor(node, count, rt, error);
recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
if (resourceWrapper.getEntryType() == EntryType.IN) {
recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
}
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
// fix bug https://github.com/alibaba/Sentinel/issues/2374
fireExit(context, resourceWrapper, count, args);
}
(3.4.5) 权限规则检查插槽-AuthoritySlot
AuthoritySlot
的主要作用是 权限规则检查。
package com.alibaba.csp.sentinel.slots.block.authority;
/**
* 致力于权限规则检查的一个处理插槽。
*/
@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// 权限校验
checkBlackWhiteAuthority(resourceWrapper, context);
//
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
/** */
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// 获取所有资源的权限规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
// 获取当前资源的权限规则
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
// 检查权限
if (!AuthorityRuleChecker.passCheck(rule, context)) {
// 权限校验不通过抛AuthorityException异常
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
(3.4.6) 系统规则检查插槽-SystemSlot
SystemSlot主要作用是系统规则检查。
package com.alibaba.csp.sentinel.slots.system;
/**
* 致力于系统规则检查的一个处理插槽。
*/
@Spi(order = Constants.ORDER_SYSTEM_SLOT)
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 系统规则检查
SystemRuleManager.checkSystem(resourceWrapper, count);
//
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
package com.alibaba.csp.sentinel.slots.system;
/**
* Sentinel System Rule 使入站流量和容量满足。 它考虑了传入请求的平均 rt、qps、线程数。
* 它还提供了系统负载的度量,但仅在 Linux 上可用。
*
* rt、qps、线程数很好理解。 如果传入请求的 rt、qps、线程数超过其阈值,请求将被拒绝。
* 但是,我们使用不同的方法来计算负载。
*
* 将系统视为管道,约束之间的转换导致三个不同区域(交通限制、容量限制和危险区域)具有不同性质的行为。
* 当运行中没有足够的请求来填充管道时,RTprop 会确定行为; 否则,系统容量占主导地位。
* 约束线在飞行中相交 = Capacity × RTprop。
* 由于管道已满,超过这一点,机载容量过剩产生了一个队列,导致RTT对机载流量的线性依赖和系统负载的增加。
* 在危险区域,系统将停止响应。
*
* 参考 BBR 算法了解更多。
*
* 注意,SystemRule仅对入站请求有效,出站流量不受SystemRule限制
*/
public final class SystemRuleManager {
/**
* 将系统规则应用于资源。 只会检查入站流量。
*
* @param 资源
* @throws BlockException 当超过任何系统规则的阈值时。
*/
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
if (resourceWrapper == null) {
return;
}
// 确保检查开关打开。
if (!checkSystemStatus.get()) {
return;
}
// 仅用于入站流量
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// Constants.ENTRY_NODE 是 入站流量的全局统计节点。
// total qps
double currentQps = Constants.ENTRY_NODE.passQps();
if (currentQps + count > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 延时
double rt = Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// 负载 BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
}
(3.4.7) 插槽-FlowSlot
package com.alibaba.csp.sentinel.slots.block.flow;
/**
* 结合从前面的插槽(NodeSelectorSlot、ClusterNodeBuilderSlot 和 StatisticSlot)收集的
* 运行时统计信息,FlowSlot 将使用预设规则来决定是否应阻止传入请求。
*
* 如果触发任何规则,SphU.entry(resourceName) 将抛出 FlowException。
* 用户可以通过捕获 FlowException 来自定义自己的逻辑。
*
* 一个资源可以有多个流规则。 FlowSlot 遍历这些规则,直到其中一条被触发或者所有规则都被遍历。
*
* 每个FlowRule主要由这些因素组成:等级、策略、路径。 我们可以结合这些因素来达到不同的效果。
*
* 等级由 FlowRule 中的 grade 字段定义。
* 此处,0 用于线程隔离,1 用于请求计数整形 (QPS)。
* 线程计数和请求计数都是在真实运行时收集的,
* 我们可以通过以下命令查看这些统计信息:
* <pre>
* curl http://localhost:8719/tree
*
* idx id thread pass blocked success total aRt 1m-pass 1m-block 1m-all exception
* 2 abc647 0 460 46 46 1 27 630 276 897 0
* </pre>
*
* thread 当前正在处理资源的线程数
* pass 一秒内传入请求数
* blocked 一秒内被阻塞的请求数
* success 一秒内被Sentinel成功处理的请求数
* RT 请求在一秒内的平均响应时间
* total 一秒内传入请求和阻塞请求的总和
* 1m-pass 为一分钟内传入请求数
* 1m-block 为一分钟内被阻塞的请求数
* 1m-all 是一分钟内传入和阻止的请求总数
* exception 为一秒内业务(自定义)异常的计数
*
* 这个阶段通常用于保护资源不被占用。 (达到限流阈值,服务快撑不住了)
* 如果资源需要很长时间才能完成,线程将开始占用。 响应时间越长,占用的线程越多。
*
* 除了计数器之外,线程池或信号量也可以用来实现这一点。
* - 线程池:分配一个线程池来处理这些资源。 当池中不再有空闲线程时,拒绝请求而不影响其他资源。
* - 信号量:使用信号量来控制该资源中线程的并发数。
*
* 使用线程池的好处是,超时可以优雅的走开。 但它也给我们带来了上下文切换和额外线程的成本。
* 如果传入请求已经在单独的线程中提供服务,例如 Servlet HTTP 请求,那么如果使用线程池,线程数几乎会翻倍。
*
*
* 流量整形
* 当QPS超过阈值时,Sentinel会采取动作控制传入的请求,由流规则中的 controlBehavior 字段配置。
* 1.立即拒绝(Immediately reject):默认行为。 超出的请求立即被拒绝 并抛出 FlowException
* 2.热启动(Warmup) : 如果一段时间以来系统的负载很低,大量的请求来了,系统可能无法一次处理所有这些请求。
* 然而,如果我们稳定地增加传入请求,系统可以预热并最终能够处理所有请求。
* 可以通过在流规则中设置字段 warmUpPeriodSec 来配置此预热期。
* 3.统一速率限制 此策略严格控制请求之间的间隔。换句话说,它允许请求以稳定、统一的速率传递。
* https://raw.githubusercontent.com/wiki/alibaba/Sentinel/image/uniform-speed-queue.png
* 该策略是漏桶算法的实现 https://en.wikipedia.org/wiki/Leaky_bucket
* 它用于以稳定的速率处理请求,通常用于突发业务(例如消息处理)。
* 当大量超出系统容量的请求同时到达时,使用此策略的系统将处理请求及其固定速率,直到所有请求都已处理或超时。
*/
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
public FlowSlot() {
this(new FlowRuleChecker());
}
/**
* Package-private for test.
*
* @param checker flow rule checker
* @since 1.6.1
*/
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 校验
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
//
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
}
(3.4.8) 熔断降级插槽-DegradeSlot
package com.alibaba.csp.sentinel.slots.block.degrade;
/**
* 专门用于熔断/电路断路 的处理插槽
*/
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
/** */
void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 获取资源对应的熔断器/断路器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
// 判断是否可以通过
if (!cb.tryPass(context)) {
// 抛出熔断异常 DegradeException
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
3.5 调用堆栈
参考资料
[1] Sentinel介绍
[2] Sentinel工作主流程