深入理解Sentinel系列-2.Sentinel原理及核心源码分析

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理、分布式技术原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 流量控制
    • 限流核心因素
    • 并发线程数
    • QPS
      • 直接拒绝
      • warm up(冷启动)
      • 匀速排队
    • 基于调用关系来做流量控制
    • Sentinel控制台
    • 动态限流规则
      • Nacos DataSource(接口)
      • 基于配置文件的动态限流
    • 集群限流
    • 熔断降级
      • 熔断指标
      • 熔断规则
      • 熔断时间窗口
      • 熔断实践
    • sentinel的实现
    • 分析sentinel源码实现
      • 树的构建 NodeSelectorSlot
      • 统计监控 StatisticSlot
      • 流量控制 FlowSlot

流量控制

sentinel中有几个比较重要的东西:

  • 资源 - 针对哪一个方法,可以在接口层面 ,也可以在类层面
  • 规则 - 根据资源配置限流的规则
  • Entry -> 请求

限流核心因素

  • resource 资源
  • count 阈值
  • grade 限流的类型

并发线程数

我们所有的请求它会去基于业务线程也好,容器分配的线程也好,其都是通过线程的分配来处理。(像dubbo就有默认200的线程大小,其实也就是服务端接收到线程以后,会分配一个业务线程去处理这个请求,其最大的线程数设置的是200。)在sentinel中,关于并发线程数的策略指的是对于并发请求的处理策略。这包括了对于同时处理的请求数量的限制、超时处理、以及资源的保护等方面的策略。这些策略旨在保证系统在高并发情况下能够正常运行,同时避免资源被过度消耗或者出现系统崩溃的情况。通过设置合适的并发线程数策略,可以有效地管理系统的资源和保证系统的稳定性。

QPS

在Sentinel中,关于QPS的策略指的是对于每秒查询次数的限制策略。这些策略可以用来控制系统的并发请求量,防止系统被过度压力而导致性能下降或系统崩溃。通过设置QPS的限制策略,可以限制系统在单位时间内能够处理的请求次数,从而保护系统免受过多请求的影响,确保系统在承受合理负载的情况下能够正常运行。这些策略可以根据系统的实际情况和需求来进行调整,以保证系统的稳定性和可靠性。


当触发了上述的两种限流,就会产生一种行为,也就是所谓流量控制的策略

  • 直接拒绝
  • warm up(冷启动)
  • 匀速排队

这个策略主要是通过ControlBehavior属性来控制策略的

直接拒绝

flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);

warm up(冷启动)

在这里插入图片描述

所谓冷启动,就是我们的系统在启动的时候长期处于低水位(整个系统的吞吐量非常低,并发量很低),当突然出现这种瞬时流量,系统有一个启动的过程,并不是一下子将所有的流量都放进来,而是逐步的拉伸整个系统的水位直到通过一个时间维度达到最高峰的阈值。(因为一下子将所有流量放进来,系统很可能在一瞬间被压垮,所以匀速提升整个系统的预热,可以在一定情况下逐渐增加处理上限)

匀速排队

在这里插入图片描述

总体来说,在限流里面,可分为比较重要的就是 资源规则 ,我们可以通过配置不同的资源去设置不同的规则,从而控制整个请求流量的行为。

基于调用关系来做流量控制

在整个调用链路里面,针对不同请求来源去做(FlowRule.limitApp)

  • 根据资源

基于资源的流量控制是指根据被调用的具体资源(比如接口、方法等)来进行流量控制。在Sentinel中,可以通过配置规则来限制不同资源的访问流量,比如可以设置某个接口的最大访问次数或者并发数。这种方式适用于对特定资源进行限流的场景,可以保护资源不被过度访问。

  • 根据来源

基于来源的流量控制是指根据调用方的来源(比如IP地址、用户ID等)来进行流量控制。在Sentinel中,可以通过配置规则来限制不同来源的访问流量,比如可以设置某个IP地址的最大访问次数或者限制某个用户的访问频率。这种方式适用于对不同来源的调用方进行个性化的限流控制。

  • 根据参数

基于参数的流量控制是指根据调用方传递的参数来进行流量控制。在Sentinel中,可以通过配置规则来限制不同参数组合的访问流量,比如可以设置某个接口某个参数的最大访问次数。这种方式适用于对特定参数组合进行限流的场景,可以根据业务需求对不同参数进行限流控制。

Sentinel控制台

java -Dserver.port=7777 -Dcsp.sentinel.dashboard.server=localhost:7777 -
Dproject.name=sentinel-dashboard-1.8.0 -jar sentinel-dashboard-1.8.0.jar

在这里插入图片描述

在这里插入图片描述

上图可以清晰的展示到sentinel可以提供哪些方向上的规则配置。

需要注意的一点就是,单机情况下,如果配置阈值的话,可以通过单机压测 压测出来。

动态限流规则

在这里插入图片描述

实际上就是动态的规则感知进行 (pull、push)

Nacos DataSource(接口)

对于整个Sentinel的动态数据源来说,必须要去做扩展。

这个接口提供了动态配置感知的能力 和 动态配置存储的能力。

// sentinel 提供的spi扩展
/*
SPI(Service Provider Interface)扩展是Java中一种用于扩展框架的机制。在SPI中,框架定义了一组接口(接口或抽象类),并允许外部的实现者按照这些接口的规范来提供具体的实现。这种机制允许系统的扩展功能可以通过在类路径下放置实现者提供的JAR包来实现,而无需修改框架的源代码。

SPI扩展机制的优点是能够实现框架和扩展的解耦,框架可以在不修改代码的情况下,通过加载外部的实现类来扩展功能。同时,SPI扩展也提供了一种简单的插件机制,允许开发者通过编写实现类来扩展框架的功能。
*/

// 扩展之后必须在 resource/META-INF/services/com.alibaba.csp.sentinel.init.initFunc 里面写入com.gupaoedu.example.springbootsentinel.FlowRuleInitFunc 对应的路径

// 扩展之后,sentinel在初始化的时候,就会去加载这个方法

public class FlowRuleInitFunc implements InitFunc{
    
    @Override
    public void init() throws Exception {
        List<FlowRule> rules=new ArrayList<>();
        FlowRule flowRule=new FlowRule();
        flowRule.setResource("read"); //针对那个资源设置规则
        flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS);//QPS或者并发数
        flowRule.setCount(5); //QPS=5
        //直接拒绝:
        flowRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_DEFAULT);
        rules.add(flowRule);

        FlowRuleManager.loadRules(rules);
    }
   
}
# Sentinel 控制台地址 将当前的应用接入到控制台
spring.cloud.sentinel.transport.dashboard=192.168.216.128:7777
# 取消Sentinel控制台懒加载
# 默认情况下 Sentinel 会在客户端首次调用的时候进行初始化,开始向控制台发送心跳包
# 配置 sentinel.eager=true 时,取消Sentinel控制台懒加载功能
spring.cloud.sentinel.eager=true
@RestController
public class SentinelController {

    @Autowired
    TestService testService;

    @GetMapping("/hello/{name}")
    public String sayHello(@PathVariable("name") String name){
        return testService.doTest(name);
    }
}

@Service
public class TestService {

    @SentinelResource(value = "doTest") //声明限流的资源
    public String doTest(String name){
        return "hello , "+name;
    }

}

此时我们已经将 sentinel 接入到 sentinel控制台了,然后我们使用jmeter来进行测试

配置http请求 和 线程数

在这里插入图片描述

此时就可以看到 在sentinel控制台中对应接口的 监控情况

在这里插入图片描述

在前面起到了,只要依赖了 sentinel这个包,那么项目就会自动的去集成流量的一个过滤,比如/hello/{name},因为没有设置规则,所以都能通过。

接下来做一个动态规则。

		<dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
            <version>1.8.0</version>
        </dependency>
public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private final String dataId="-flow-rules";

    private final String appName="App-Test";

    @Override
    public void init() throws Exception {
        registerFlowRule();
    }
    

    private void registerFlowRule(){
        // 从远程服务器上加载规则
        ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,appName+dataId,
                                                         // 当数据发生变化后,会回调这个方法
                        source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
   	// 这行代码将上面创建的数据源注册到中。是Sentinel中管理流控规则的组件,方法用于将提供的(属性)注册到。这样,每当Nacos中的配置发生变化时,会获取最新的规则,并通知更新内存中的流控规则。
        FlowRuleManager.register2Property(flowRuleDs.getProperty());
    }
}

此时在nacos中修改对应的配置,就能够监听到,然后更新到内存中的流控规则中

在这里插入图片描述

  • 通过访问测试,即可看到被限流的效果。
  • 也可以在 ${用户}/logs/csp/sentinel-record.log.2020-09-22 文件中看到sentinel启动过程中动态数据源的加载过程。

基于配置文件的动态限流

spring.cloud.sentinel.transport.clientIp=192.168.216.128:7777
spring.cloud.sentinel.datasource.nacos.nacos.serverAddr=192.168.216.128:8848
spring.cloud.sentinel.datasource.nacos.nacos.dataId=com.gupaoedu.sentinel.demo.flow.rule
spring.cloud.sentinel.datasource.nacos.nacos.groupId=SENTINEL_GROUP
spring.cloud.sentinel.datasource.nacos.nacos.dataType=json
spring.cloud.sentinel.datasource.nacos.nacos.ruleType=flow
spring.cloud.sentinel.datasource.nacos.nacos.username=nacos
spring.cloud.sentinel.datasource.nacos.nacos.password=nacos

在这里的配置会出现一个问题,就是修改 sentinel 控制台配置,和nacos上的配置不一致,这样的话最简单的就是修改sentinel的源码,当sentinel控制台变化的时候,动态去修改nacos上的配置即可。

集群限流

token-client代表我们的应用,当一个请求过来以后,经过token-client会进行一个动态的判断,token-server里面采用的是一个动态数据源,此时就能够实现集群限流的思想了,当来一个请求的时候,token-client先去token-server中去判断是否能够通过,如果通过才能执行,否则限流。

当token-server挂了之后,能够直接连接nacos配置中心的规则进行一个处理

在这里插入图片描述

token -server

public static void main(String[] args) throws Exception {
        ClusterTokenServer tokenServer=new SentinelDefaultTokenServer();
        //手动载入namespace和serverTransportConfig的配置到ClusterServerConfigManager
        //集群限流服务端通信相关配置
        ClusterServerConfigManager.loadGlobalTransportConfig(
                new ServerTransportConfig().setIdleSeconds(600).setPort(9999));
        //加载namespace集合列表() , namespace也可以放在配置中心
        ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton("App-Test"));
        tokenServer.start();
        //Token-client会上报自己的project.name到token-server。Token-server会根据namespace来统计连接数
    }


// 主要作用是从 Nacos 配置中心获取流控规则,并将其应用到 Sentinel 中,以实现动态的流控规则管理。
public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private String dataId="-flow-rules";

    @Override
    public void init() throws Exception {
        ClusterFlowRuleManager.setPropertySupplier(namespace->{
            ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                    new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,namespace+dataId,
                            source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
            return flowRuleDs.getProperty();
        });
    }
}

在这里插入图片描述

token-client

public class FlowRuleInitFunc implements InitFunc{
    private final String nacosAddress="192.168.216.128:8848";
    private final String groupId="SENTINEL_GROUP";
    private final String dataId="-flow-rules";

    private final String clusterServerHost="localhost";
    private final int clusterServerPort=9999;
    private final int requestTimeOut=20000;
    private final String appName="App-Test";

    @Override
    public void init() throws Exception {
        loadClusterConfig();
        registerFlowRule();
    }
    private void loadClusterConfig(){
        ClusterClientAssignConfig assignConfig=new ClusterClientAssignConfig();
        assignConfig.setServerHost(clusterServerHost); //放到配置中心
        assignConfig.setServerPort(clusterServerPort);
        ClusterClientConfigManager.applyNewAssignConfig(assignConfig);
        ClusterClientConfig clientConfig=new ClusterClientConfig();
        clientConfig.setRequestTimeout(requestTimeOut);  //放到配置中心
        ClusterClientConfigManager.applyNewConfig(clientConfig);
    }

    private void registerFlowRule(){
        ReadableDataSource<String,List<FlowRule>> flowRuleDs=
                new NacosDataSource<List<FlowRule>>(nacosAddress,groupId,appName+dataId,
                        source-> JSON.parseObject(source,new TypeReference<List<FlowRule>>(){}));
        FlowRuleManager.register2Property(flowRuleDs.getProperty());
    }
}

以上就是一个集群流控的核心实现了。

在这里插入图片描述

总结:集群限流配置一个总的qps,多个节点可以配置权重,或者均摊的方式来起到限流的作用。

熔断降级

Sentinel的熔断降级是一种服务保护机制,用于在系统出现异常或超载时,自动限制对受影响服务的访问,防止其继续受到压力而导致系统崩溃。熔断降级可以通过设置阈值和规则来实现,一旦达到设定的条件,系统将自动停止对服务的访问,直到服务恢复正常或超载情况解除。这样可以保护系统免受过载和异常情况的影响,确保系统的稳定性和可靠性。

熔断指标

怎么判断这个服务是一个不稳定的状态?

异常数:

平均响应时间:

异常比例数:10s内,20次请求里面,有百分之五十及以上的错误率,就认为其要触发熔断,而hystrix只有这一种

熔断规则

规则,也就是要设计对应的指标,指标设计为多少才会熔断!

比如说:

  • 1min内,异常数量超过50%,触发熔断
  • 1s 5个请求,平均响应时间超过一个阈值(1000ms)
  • 1min内,超过了多少个异常数量

熔断时间窗口

当处于熔断状态的时候,多长时间内,请求都不会发送到服务端,熔断窗口也是一个需要设置的阈值

熔断实践

public class DataSourceInitFunc implements InitFunc{
    public void init() throws Exception{
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule();
        rule.setResource("com.gupaodu.springcloud.dubbo.ISayHelloService");// 表示针对那个服务或者方法的熔断
        // 针对这个类下面所有的接口进行资源的判断,当这个类下面任何一个方法触发了熔断,那么调用这个类下面的任何一个接口都会自动触发降级。
        // 设置模式
        rule.setGrade(RuleConstant.Degrade.DEGRADE_CRADE_EXCEPTION_COUNT)// 错误数 超时时间 错误率 指标
        rule.setCount(3);// 阈值
        rule.setTimeWindow(100)// 时间窗口 100s
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
    }
}


public interface ISayHelloService{
    
    String sayHello(String msg);
    
	String exceptionTest();
}

@DubboService
public class SayHelloServiceImpl implement ISayHelloService{
    public String sayHello(String msg){
        return "";
    }
    
    // 用来触发测试
    public String exceptionTest(){
        throw new RuntimeException("biz exception");
    }
}

@RestController
public class SentinelController {
    @DubboReference(mock="......SayHelloServiceMock")
    ISayHelloService sayHelloService;
    
    @GetMapping("/say")
    public String say(){
        return sayHelloService.sayHello("");
    }
    
    @GetMapping("/exception")
    public String exception(){
        return sayHelloService.exceptionTest();
    }
}

public class SayHelloServiceMock implements ISayHelloService{
    public String sayHello(String msg){
        return "触发了降级,返回默认数据";
    }
    
    // 用来触发测试
    public String exceptionTest(){
        return "触发了降级,返回默认数据"
    }
}

其实本质就是 当触发熔断之后,该接口下其他方法在时间窗口内 也不能访问,只能被降级。

sentinel的实现

构建一个资源

设置一个规则

指标范围:

  • 当前的qps
  • 当前的总的并发数线程数
  • 当前失败请求数是多少?

在这里插入图片描述

此图为官方提供的图,一个请求过来,它会在 调用链路构建处 构建一个 Invocation Tree结构,也就是将我们请求的资源构建成一个节点,以树形的方式去构建。然后就是针对某个节点进行监控统计,主要是以环形数组的方式哦存,其实也就是每一个node维护了一个滑动窗口,其统计了针对这个资源总的信息去进行指标统计,然后通过一个判断责任链去鉴别触发了那些规则。

分析sentinel源码实现

public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
        return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
    }

public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException {
        return this.entryWithType(name, resourceType, entryType, count, false, args);
    }

public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
        return this.entryWithPriority(resource, count, prioritized, args);
    }

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
    // 获取线程上下文
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
        } else {
            if (context == null) {
                context = CtSph.InternalContextUtil.internalEnter("sentinel_default_context");
            }

            // 是否开启或关闭流控
            if (!Constants.ON) {
                return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
            } else {
                // 构建了一个调用链
                ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);
------------------------------------------------------------------------------
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized(LOCK) {
                chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
                if (chain == null) {
                    // 超过阈值就直接返回null
                    if (chainMap.size() >= 6000) {
                        return null;
                    }
					// 单例模式创建调用链
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                    
                    // 如果直接添加进去的话,涉及到扩容,所以采用创建之后赋值的方法
                }
            }
        }

        return chain;
    }
    
public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        } else {
            slotChainBuilder = (SlotChainBuilder)SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
            if (slotChainBuilder == null) {
                RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default", new Object[0]);
                slotChainBuilder = new DefaultSlotChainBuilder();
            } else {
                RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", new Object[]{slotChainBuilder.getClass().getCanonicalName()});
            }

            return slotChainBuilder.build();
        }
    }
                
 public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        Iterator var3 = sortedSlotList.iterator();
		// 然后在这里进行一个遍历
        while(var3.hasNext()) {
            ProcessorSlot slot = (ProcessorSlot)var3.next();
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
            } else {
                chain.addLast((AbstractLinkedProcessorSlot)slot);
            }
        }

        return chain;
    }
                
// 在这里面涉及到了SpiLoader
// 其实也就是在这个链路中,可以进入到我们自己的 自定义的slot 中

在这里插入图片描述

在这里插入图片描述

if (chain == null) {
                    return new CtEntry(resourceWrapper, (ProcessorSlot)null, context);
                } else {
                    CtEntry e = new CtEntry(resourceWrapper, chain, context);
					
                    try {
                        // 构建好以后然后通过链式去处理
                        chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);
                    } catch (BlockException var9) {
                        e.exit(count, args);
                        throw var9;
                    } catch (Throwable var10) {
                        RecordLog.info("Sentinel unexpected exception", var10);
                    }

                    return e;
                }
            }
        }
    }

在这里插入图片描述

这两个就是前面说的滑动窗口要去统计的东西

前面传进来的资源包装成一个 resourceWrapper对象

在这里插入图片描述

当我们打算进入 chain.entry 的时候,发现其实就是一系列的slot

在这里插入图片描述

树的构建 NodeSelectorSlot

public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        DefaultNode node = (DefaultNode)this.map.get(context.getName());
    // 双重检查锁 + 缓存
        if (node == null) {
            synchronized(this) {
                node = (DefaultNode)this.map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, (ClusterNode)null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap(this.map.size());
                    cacheMap.putAll(this.map);
                    cacheMap.put(context.getName(), node);
                    this.map = cacheMap;
                    ((DefaultNode)context.getLastNode()).addChild(node);
                }
            }
        }

        context.setCurNode(node);
    // 当上述构建操作完成后
    // 释放entry
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
// 释放的本质其实就是执行下一个
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (this.next != null) {
            this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }

    }

统计监控 StatisticSlot

实际上是并发量比较大的情况下做数据统计

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        Iterator var8;
        ProcessorSlotEntryCallback handler;
        try {
            // 先交给后续的slot进行处理
            this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // 然后开始统计资源
            // node代表当前传过来的资源
            node.increaseThreadNum();
            node.addPassRequest(count);
-----------------------------------------------------------------------------------
public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }
public void addPassRequest(int count) {
        this.rollingCounterInSecond.addPass(count); // 秒级别的统计
        this.rollingCounterInMinute.addPass(count); // 分钟级别的统计
    }
    
this.rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
            
SampleCountProperty.SAMPLE_COUNT = 2;
IntervalProperty.INTERVAL = 1000;

// 上面有点像滑动窗口的概念
public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = this.data.currentWindow();
    // 根据某一个指标加进去 这里面的本质就是通过当前的时间去得到一个窗口,将其加进去
    // LongAdder的思想 其底层就是数组累加的思想 !!!
        ((MetricBucket)wrap.value()).addPass(count);
    }
 // 当最终计数完成后,相当于形成了一开始的那个图的样子
            
            
private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
            
            
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

            // super(sampleCount, intervalInMs);
public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        this.windowLengthInMs = intervalInMs / sampleCount; // 每个窗口的时间长度
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = (double)intervalInMs / 1000.0D;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray(sampleCount); // sampleCount = 2
    	// 也就代表着窗口的大小是两个
    }
            
            
protected final AtomicReferenceArray<WindowWrap<T>> array;

在这里插入图片描述

           
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            Iterator var13 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var13.hasNext()) {
                ProcessorSlotEntryCallback<DefaultNode> handler = (ProcessorSlotEntryCallback)var13.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException var10) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseThreadNum();
            }

            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException var11) {
            BlockException e = var11;
            context.getCurEntry().setBlockError(var11);
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            var8 = StatisticSlotCallbackRegistry.getEntryCallbacks().iterator();

            while(var8.hasNext()) {
                handler = (ProcessorSlotEntryCallback)var8.next();
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable var12) {
            context.getCurEntry().setError(var12);
            throw var12;
        }

    }

流量控制 FlowSlot

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        this.checkFlow(resourceWrapper, context, node, count, prioritized);
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        this.checker.checkFlow(this.ruleProvider, resource, context, node, count, prioritized);
    }

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider != null && resource != null) {
            // 获取针对这个资源的限流规则
            Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());
            if (rules != null) {
                Iterator var8 = rules.iterator();

                while(var8.hasNext()) {
                    FlowRule rule = (FlowRule)var8.next();
                    if (!this.canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }

        }
    }

public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        } else {
            return rule.isClusterMode() ? passClusterCheck(rule, context, node, acquireCount, prioritized) : passLocalCheck(rule, context, node, acquireCount, prioritized);
        }
    }

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    	// 在这里再根据不同的策略去决定拒绝的要怎么办
        return selectedNode == null ? true : rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();
    	// 根据不同的来源进行限流
        if (limitApp.equals(origin) && filterOrigin(origin)) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else if ("default".equals(limitApp)) {
            return (Node)(strategy == 0 ? node.getClusterNode() : selectReferenceNode(rule, context, node));
        } else if ("other".equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            return strategy == 0 ? context.getOriginNode() : selectReferenceNode(rule, context, node);
        } else {
            return null;
        }
    }


public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    	// 根据资源获取当前的使用量
        int curCount = this.avgUsedTokens(node);
    	// 如果当前使用量 + 请求 > 阈值 则按照策略进行限流
        if ((double)(curCount + acquireCount) > this.count) {
            if (prioritized && this.grade == 1) {
                long currentTime = TimeUtil.currentTimeMillis();
                long waitInMs = node.tryOccupyNext(currentTime, acquireCount, this.count);
                if (waitInMs < (long)OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    this.sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }

            return false;
        } else {
            return true;
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/225140.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

04 ECharts基础入门

文章目录 一、ECharts介绍1. 简介2. 相关网站3. HTML引入方式4. 基本概念 二、常见图表1. 柱状图2. 折线图3. 饼图4. 雷达图5. 地图 三、应用1. 动画2. 交互 一、ECharts介绍 1. 简介 ECharts是一个使用JavaScript实现的开源可视化库&#xff0c;用于生成各种图表和图形。 EC…

RT_Thread_修改为外部晶振及验证

关注两处&#xff1a; 1、stm32f4xx_hal_conf.h&#xff0c;外部晶振频率HSE宏定义 2、drv_clk.c&#xff0c;system_clock_config函数 1、外部晶振频率HSE宏定义 根据实际外部晶振的频率去定义&#xff0c;使用的是8MHz&#xff1b; 2、system_clock_config 开启HSE&#…

改造python3中的http.server为简单的文件上传下载服务

改造 修改python3中的http.server.SimpleHTTPRequestHandler&#xff0c;实现简单的文件上传下载服务 simple_http_file_server.py&#xff1a; # !/usr/bin/env python3import datetime import email import html import http.server import io import mimetypes import os …

Thymeleaf生成pdf表格合并单元格描边不显示

生成pdf后左侧第一列的右描边不显示&#xff0c;但是html显示正常 显示异常时描边的写法 cellpadding“0” cellspacing“0” &#xff0c;td,th描边 .self-table{border:1px solid #000;border-collapse: collapse;width:100%}.self-table th{font-size:12px;border:1px sol…

markdown学习(初学者)

学习途径&#xff1a; 在线markdown编辑器_微信公众号markdown排版工具 Markdown 基本语法 | Markdown 官方教程 标题 要创建标题&#xff0c;请在单词或短语前面添加井号 (#) 。# 的数量代表了标题的级别。例如&#xff0c;添加三个 # 表示创建一个三级标题 (<h3>) (…

【华为网络-配置-023】- 一般企业网架构方案(单节点方案)

要求&#xff1a; 1、防火墙 FW1 G1/0/0 接口使用 PPPoE 拨号获取 IP 地址。 2、FW1 配置信任&#xff08;内网包含服务器&#xff09;和非信任区域&#xff08;Internet 外网&#xff09;。 3、FW1 配置 NAPT 使内网可以上网。 4、核心交换机 LSW1 划分 VLAN 并配置各接口及三…

Python 日期时间模块详解(datetime)

文章目录 1 概述1.1 datetime 类图1.2 类描述 2 常用方法2.1 获取当前日期时间&#xff1a;now()、today()、time()2.2 日期时间格式化&#xff1a;strftime()2.3 日期时间大小比较&#xff1a;>、、<2.4 日期时间间隔&#xff1a;- 3 扩展3.1 Python 中日期时间格式化符…

Java / Scala - Trie 树简介与应用实现

目录 一.引言 二.Tire 树简介 1.树 Tree 2.二叉搜索树 Binary Search Tree 3.字典树 Trie Tree 3.1 基本概念 3.2 额外信息 3.3 结点实现 3.4 查找与存储 三.Trie 树应用 1.应用场景 2.Java / Scala 实现 2.1 Pom 依赖 2.2 关键词匹配 四.总结 一.引言 Trie 树…

案例054:基于微信的追星小程序

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

安装node.js并创建第一个vue项目

目录 一&#xff0c;下载node.js 二&#xff0c;创建一个vue项目 一&#xff0c;下载node.js 1.进入官网&#xff1a;Node.js (nodejs.org) 2.选择版本 3.选择安装方式 4.运行安装包&#xff0c;下载文件 5.选择要安装的路径后一直next 6.安装完成后打开命令提示符&#xff…

PHP短信接口防刷防轰炸多重解决方案三(可正式使用)

短信接口盗刷轰炸&#xff1a;指的是黑客利用非法手段获取短信接口的访问权限&#xff0c;然后使用该接口发送大量垃圾短信给目标用户 短信验证码轰炸解决方案一(验证码类解决)-CSDN博客 短信验证码轰炸解决方案二(防止海外ip、限制ip、限制手机号次数解决)-CSDN博客 PHP短信…

推荐几款转换视频格式的好用转换工具,小白也能上手

视频格式转换工具是一种专门转换视频的软件&#xff0c;可让你将一种视频格式转换为另一种视频格式&#xff08;例如&#xff0c;MOV 到 MP4&#xff09;&#xff0c;通常可以节省空间。 本文将介绍一些用于转换视频格式的好用转换工具&#xff0c;并且详细描述了它们的主要功…

【FPGA】数字电路设计基础

数字电路基础 1 什么是数字电路 在学习数字电路之前&#xff0c;我们先要了解下什么是数字电路。想要搞明白数字电路&#xff0c;就要搞明白生活中有 两种概念&#xff0c; 数字信号和模拟信号&#xff0c;模拟信号一般包括压力、气温、速度等信号&#xff0c;模拟量的值是可…

智能成绩表 - 华为OD统一考试(C卷)

OD统一考试(C卷) 分值: 100分 题目描述 小明来到某学校当老师,需要将学生按考试总分或单科分数进行排名,你能帮帮他吗? 输入描述 第1行输入两个整数,学生人数n和科目数量m。0<n<100,0<m<10 第2行输入m个科目名称,彼此之间用空格隔开。科目名称只由英文…

这书看着贼得劲儿

作者呕心沥血2年&#xff0c;再出力作~~~ 给大家推荐一本好玩的书 神经网络与TensorFlow 本来以为出版了第一本书&#xff0c;应该对于漫长的审核有免疫力了&#xff0c;结果又被这本书折磨了2年。于是作者痛定思痛&#xff0c;决定第三本书写一本纯科普的书籍。 墙裂推荐 这…

跨境电商危机公关:应对负面舆情的策略优化

随着跨境电商的快速发展&#xff0c;企业在全球市场中面临的竞争与挑战也日益复杂。在这个数字时代&#xff0c;负面舆情一旦爆发&#xff0c;可能对企业形象和经营造成深远影响。 因此&#xff0c;跨境电商企业需要建立有效的危机公关策略&#xff0c;以迅速、果断、有效地应…

Python集合魔法:解锁数据去重技巧

. 在Python编程的魔法世界中&#xff0c;有一种数据类型几乎被忽视&#xff0c;但却拥有强大的超能力&#xff0c;那就是集合&#xff08;Set&#xff09;。 集合是一种无序、唯一的数据类型&#xff0c;它以其独特的特点在编程世界中独占一席之地。 1. 集合的定义和特点 集…

Quantlib环境安装踩坑记录

1.conda 安装python3.7环境&#xff1b; 2.因为torch1.8.2环境已经被弃用&#xff0c;所以通过nvidia-smi命令确认cuda版本是11.6后进入环境&#xff0c; 输入pip install torch1.12.0cu116 torchvision0.13.0cu116 torchaudio0.12.0 torchtext0.13.0 --extra-index-url https:…

第二证券:股债跷跷板是什么?投资者该如何应对?

股市和债市虽然是两个不同的证券交易商场&#xff0c;但它们之间保持着一定的联系&#xff0c;比较典型的&#xff0c;便是股债商场间的联动&#xff0c;也称为股债跷跷板。股债跷跷板是什么&#xff1f;出资者该怎样应对&#xff1f;关于这些&#xff0c;本文将借用相关知识作…

基于SSM框架的购物商城系统论文

摘 要 网络技术和计算机技术发展至今&#xff0c;已经拥有了深厚的理论基础&#xff0c;并在现实中进行了充分运用&#xff0c;尤其是基于计算机运行的软件更是受到各界的关注。加上现在人们已经步入信息时代&#xff0c;所以对于信息的宣传和管理就很关键。因此商城购物信息的…