Kubernetes调度器源码学习(一):调度器工作原理、调度器启动流程、调度队列

本文基于Kubernetes v1.22.4版本进行源码学习

1、调度器工作原理

1)、调度流程

kube-scheduler的主要作用就是根据特定的调度算法和调度策略将Pod调度到合适的Node节点上去,是一个独立的二进制程序,启动之后会一直监听API Server,获取到PodSpec.NodeName为空的Pod,对每个Pod都会创建一个binding

在这里插入图片描述

调度主要分为以下几个部分:

  • 首先是预选过程,过滤掉不满足条件的节点,这个过程称为Predicates(过滤)
  • 然后是优选过程,对通过的节点按照优先级排序,称之为Priorities(打分)
  • 最后从中选择优先级最高的节点,如果中间任何一步骤有错误,就直接返回错误

Predicates阶段首先遍历全部节点,过滤掉不满足条件的节点,属于强制性规则,这一阶段输出的所有满足要求的Node将被记录并作为第二阶段的输入,如果所有的节点都不满足条件,那么Pod将会一直处于Pending状态,直到有节点满足条件,在这期间调度器会不断的重试

Priorities阶段即再次对节点进行筛选,如果有多个节点都满足条件的话,那么系统会按照节点的优先级(priorites)大小对节点进行排序,最后选择优先级最高的节点来部署Pod

详细调度流程如下:

  1. 用户提交创建Pod的请求,可以通过API Server的REST API,也可用Kubectl命令行工具

  2. API Server收到用户请求后,存储相关数据到etcd中

  3. 调度器监听API Server查看到还未调度(bind)的Pod列表,循环遍历地为每个Pod尝试分配节点,这个分配过程就是我们上面提到的两个阶段:

    • 预选阶段(Predicates),过滤节点,调度器用一组规则过滤掉不符合要求的Node节点,比如Pod设置了资源的request,那么可用资源比Pod需要的资源少的主机就会被过滤掉
    • 优选阶段(Priorities),为节点的优先级打分,将上一阶段过滤出来的Node列表进行打分,调度器会考虑一些整体的优化策略,比如把Deployment控制的多个Pod副本分布到不同的主机上,使用最低负载的主机等策略

    经过上面的阶段过滤后选择打分最高的Node节点和Pod进行binding操作,然后将结果存储到etcd中

  4. 最后被选择出来的Node节点对应的kubelet去执行创建Pod的相关操作

2)、调度框架

调度框架定义了一组扩展点,用户可以实现扩展点定义的接口来定义自己的调度逻辑(我们称之为扩展),并将扩展注册到扩展点上,调度框架在执行调度工作流时,遇到对应的扩展点时,将调用用户注册的扩展。调度框架在预留扩展点时,都是有特定的目的,有些扩展点上的扩展可以改变调度程序的决策方法,有些扩展点上的扩展只是发送一个通知

调度一个Pod的过程分为两个阶段:调度周期(Scheduling Cycle)和绑定周期(Binding Cycle)

调度周期为Pod选择一个合适的节点,绑定周期将调度过程的决策应用到集群中(也就是在被选定的节点上运行Pod)。调度周期和绑定周期一起被称为调度上下文(Scheduling Context)。调度过程和绑定过程遇到该Pod不可调度或存在内部错误,则中止调度或绑定周期,该Pod将返回队列并重试

下图展示了调度框架中的调度上下文及其中的扩展点,一个扩展可以注册多个扩展点,以便可以执行更复杂的有状态的任务

在这里插入图片描述

调度周期是同步运行的,同一时间点只为一个Pod进行调度;绑定周期是异步执行的,同一时间点可并发为多个Pod执行绑定

调度周期:

  1. Sort:用于对调度队列中的Pod进行排序,以决定先调度哪个Pod,本质上只需要实现Less(Pod1, Pod2)方法用于比较两个Pod谁更优先获得调度,同一时间点只能有一个Sort插件生效
  2. PreFilter:用于对Pod的信息进行预处理,或者检查一些集群或Pod必须满足的前置条件,然后将其存入缓存中待Filter扩展执行的时候使用,如果PreFilter返回了error,则调度过程终止
  3. Filter:用来过滤掉不满足Pod调度要求的节点,对于每一个节点,调度器将按顺序执行Filter扩展;如果任何一个Filter扩展将节点标记为不可选,则余下的Filter扩展将不会被执行,调度器可以同时对多个节点执行Filter扩展
  4. PostFilter:如果在Filter扩展点全部节点都被过滤掉了,没有合适的节点进行调度,才会执行PostFilter扩展点,如果启用了Pod抢占特性,那么会在这个扩展点进行抢占操作,可以用于logs/metrics
  5. PreScore:对Score扩展点的数据做一些预处理操作,然后将其存入缓存中待Score扩展点执行的时候使用
  6. Score:用于为所有可选节点进行打分,调度器将针对每一个节点调用每个Score扩展,评分结果是一个范围内的整数,代表最小和最大分数。在NormalizeScore阶段,调度器将会把每个Score扩展对具体某个节点的评分结果和该扩展的权重合并起来,作为最终评分结果
  7. NormalizeScore:在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中Score扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个NormalizeScore扩展一次
  8. Reserve:一个通知性质的扩展点,实现Reserve扩展的插件有两种方法,分别是Reserve和Unreserve。有状态的插件可以使用该扩展点来获得节点上为Pod预留的资源,该事件发生在调用器将Pod绑定到节点之前,目的是避免调度器在等待Pod与节点绑定的过程中调度新的Pod到节点上,发生实际使用资源超出可用资源的情况(因为绑定Pod到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod进入reserved状态以后,要么在绑定失败时触发Unreserve扩展,要么在绑定成功时,由PostBind扩展结束绑定过程
  9. Permit:用于阻止或者延迟Pod与节点的绑定。可以执行三种操作:
    • approve(批准):当所有的Permit扩展都approve了Pod与节点的绑定,调度器将继续执行绑定过程
    • deny(拒绝):如果任何一个Permit扩展deny了Pod与节点的绑定,Pod将被放回到待调度队列,此时将触发Unreserve扩展
    • wait(等待):如果一个Permit扩展返回了wait,则Pod将保持在Permit阶段,直到被其他扩展approve。如果超时,wait状态变成deny,Pod 将被放回到待调度队列,此时将触发Unreserve扩展

绑定周期:

  1. PreBind:用于Pod绑定之前执行某些逻辑。例如,可以将一个基于网络的数据卷挂载到节点上,以便Pod可以使用。如果任何一个PreBind扩展返回错误,Pod将会被放回到待调度队列,此时将触发Unreserve扩展
  2. Bind:用于将Pod绑定到节点上
    • 只有所有的PreBind扩展都成功执行了,Bind扩展才会执行
    • 调度框架按照Bind扩展注册的顺序逐个调用Bind扩展
    • 具体某个Bind扩展可以选择处理或不处理该Pod
    • 如果某个Bind扩展处理了该Pod与节点的绑定,余下的Bind扩展将会被忽略
  3. PostBind:是一个通知性质的扩展
    • PostBind扩展在Pod成功绑定到节点上之后被动调用
    • PostBind扩展是绑定过程的最后一个步骤,可以用来执行资源清理的操作

2、调度器启动流程

1)、启动入口

kube-scheduler的启动入口位于cmd/kube-scheduler/scheduler.go文件,该文件中就包含一个main入口函数:

// cmd/kube-scheduler/scheduler.go
func main() {
	rand.Seed(time.Now().UnixNano())

	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)

	// 初始化Cobra.Command对象
	command := app.NewSchedulerCommand()

	logs.InitLogs()
	defer logs.FlushLogs()

	// 执行命令
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

main函数中通过app.NewSchedulerCommand()方法获得一个Cobra的Command对象,然后调用command.Execute()方法执行这个命令,NewSchedulerCommand()方法代码如下:

// cmd/kube-scheduler/app/server.go
// NewSchedulerCommand使用默认参数和registryOptions创建一个*cobra.Command对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	// 获取默认的配置参数
	opts := options.NewOptions()

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `The Kubernetes scheduler is a control plane process which assigns
Pods to Nodes. The scheduler determines which Nodes are valid placements for
each Pod in the scheduling queue according to constraints and available
resources. The scheduler then ranks each valid Node and binds the Pod to a
suitable Node. Multiple different schedulers may be used within a cluster;
kube-scheduler is the reference implementation.
See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
for more information about scheduling and the kube-scheduler component.`,
		Run: func(cmd *cobra.Command, args []string) {
			// 真正执行的函数入口
			if err := runCommand(cmd, opts, registryOptions...); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
		Args: func(cmd *cobra.Command, args []string) error {
			for _, arg := range args {
				if len(arg) > 0 {
					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
				}
			}
			return nil
		},
	}

	nfs := opts.Flags
	verflag.AddFlags(nfs.FlagSet("global"))
	globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name())
	fs := cmd.Flags()
	for _, f := range nfs.FlagSets {
		fs.AddFlagSet(f)
	}

	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
	cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)

	cmd.MarkFlagFilename("config", "yaml", "yml", "json")

	return cmd
}

真正执行的函数入口是runCommand()方法,代码如下:

// cmd/kube-scheduler/app/server.go
// runCommand运行调度器
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	verflag.PrintAndExitIfRequested()
	cliflag.PrintFlags(cmd.Flags())

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go func() {
		stopCh := server.SetupSignalHandler()
		<-stopCh
		cancel()
	}()

	// 根据命令行参数和options创建完整的配置和调度程序
	cc, sched, err := Setup(ctx, opts, registryOptions...)
	if err != nil {
		return err
	}

	// 真正去启动调度器
	return Run(ctx, cc, sched)
}

runCommand()方法中先调用Setup()方法根据命令行参数和options构造完整的配置和调度器对象,然后调用Run()方法真正去启动调度器

2)、创建Scheduler对象

runCommand()方法中通过Setup()方法根据命令行参数和options构造完整的配置和调度器对象,代码如下:

// cmd/kube-scheduler/app/server.go
// Setup根据命令行参数和options构造完整的配置和调度器对象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
	// 获取默认的配置
	if cfg, err := latest.Default(); err != nil {
		return nil, nil, err
	} else {
		opts.ComponentConfig = cfg
	}

	// 校验命令行选项
	if errs := opts.Validate(); len(errs) > 0 {
		return nil, nil, utilerrors.NewAggregate(errs)
	}

	// 获取调度器config对象,该对象拥有一个调度器所有的上下文信息
	c, err := opts.Config()
	if err != nil {
		return nil, nil, err
	}

	// Get the completed config
	// 获取completed配置
	cc := c.Complete()

	outOfTreeRegistry := make(runtime.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return nil, nil, err
		}
	}

	recorderFactory := getRecorderFactory(&cc)
	completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
	// Create the scheduler.
	// 创建调度器
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		recorderFactory,
		ctx.Done(),
		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
		scheduler.WithKubeConfig(cc.KubeConfig),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithLegacyPolicySource(cc.LegacyPolicySource),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
			completedProfiles = append(completedProfiles, profile)
		}),
	)
	if err != nil {
		return nil, nil, err
	}
	if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
		return nil, nil, err
	}

	return &cc, sched, nil
}

Setup()方法中调用scheduler.New()方法去构造一个真正的调度器对象,代码如下:

// pkg/scheduler/scheduler.go
// New返回一个Scheduler对象
func New(client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	recorderFactory profile.RecorderFactory,
	stopCh <-chan struct{},
	opts ...Option) (*Scheduler, error) {

	stopEverything := stopCh
	if stopEverything == nil {
		stopEverything = wait.NeverStop
	}

	// 默认的调度器配置
	options := defaultSchedulerOptions
	for _, opt := range opts {
		opt(&options)
	}

	if options.applyDefaultProfile {
		var versionedCfg v1beta2.KubeSchedulerConfiguration
		scheme.Scheme.Default(&versionedCfg)
		cfg := config.KubeSchedulerConfiguration{}
		if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
			return nil, err
		}
		options.profiles = cfg.Profiles
	}
	schedulerCache := internalcache.New(30*time.Second, stopEverything)

	registry := frameworkplugins.NewInTreeRegistry()
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}

	snapshot := internalcache.NewEmptySnapshot()
	clusterEventMap := make(map[framework.ClusterEvent]sets.String)

	configurator := &Configurator{
		componentConfigVersion:   options.componentConfigVersion,
		client:                   client,
		kubeConfig:               options.kubeConfig,
		recorderFactory:          recorderFactory,
		informerFactory:          informerFactory,
		schedulerCache:           schedulerCache,
		StopEverything:           stopEverything,
		percentageOfNodesToScore: options.percentageOfNodesToScore,
		podInitialBackoffSeconds: options.podInitialBackoffSeconds,
		podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
		profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),
		registry:                 registry,
		nodeInfoSnapshot:         snapshot,
		extenders:                options.extenders,
		frameworkCapturer:        options.frameworkCapturer,
		parallellism:             options.parallelism,
		clusterEventMap:          clusterEventMap,
	}

	metrics.Register()

	var sched *Scheduler
	if options.legacyPolicySource == nil {
		// Create the config from component config
		sc, err := configurator.create()
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler: %v", err)
		}
		sched = sc
	} else {
		// Create the config from a user specified policy source.
		policy := &schedulerapi.Policy{}
		switch {
		case options.legacyPolicySource.File != nil:
			if err := initPolicyFromFile(options.legacyPolicySource.File.Path, policy); err != nil {
				return nil, err
			}
		case options.legacyPolicySource.ConfigMap != nil:
			if err := initPolicyFromConfigMap(client, options.legacyPolicySource.ConfigMap, policy); err != nil {
				return nil, err
			}
		}
		// Set extenders on the configurator now that we've decoded the policy
		// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
		// which would have set extenders in the above instantiation of Configurator from CC options)
		configurator.extenders = policy.Extenders
		sc, err := configurator.createFromPolicy(*policy)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
		}
		sched = sc
	}

	// Additional tweaks to the config produced by the configurator.
	sched.StopEverything = stopEverything
	sched.client = client

	// Build dynamic client and dynamic informer factory
	var dynInformerFactory dynamicinformer.DynamicSharedInformerFactory
	// options.kubeConfig can be nil in tests.
	if options.kubeConfig != nil {
		dynClient := dynamic.NewForConfigOrDie(options.kubeConfig)
		dynInformerFactory = dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynClient, 0, v1.NamespaceAll, nil)
	}

	// 启动一系列的资源对象事件监听程序
	addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
	return sched, nil
}

首先将默认的调度器配置通过传递的Option参数进行一一配置,接着就是注册框架,初始化调度队列,通过一系列的操作,就实例化了真正的调度器对象,最后去启动一系列的资源对象事件监听程序,比如Pod、Node等对象,上面方法中通过addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))来实现,关于这些资源对象对应的onAdd、onUpdate、onDelete操作均在pkg/scheduler/eventhandlers.go文件中实现,比如创建一个Pod过后,调度器通过watch就会在onAdd事件中接收该操作,然后就可以根据queue sort插件将Pod加入到待调度队列中去开始调度了

3)、启动调度器

runCommand()方法中通过调用Run()方法来真正启动调度器,代码如下:

// cmd/kube-scheduler/app/server.go
// Run根据给定的配置执行调度程序,它仅在出错或上下文完成时返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	// To help debugging, immediately log version
	klog.V(1).InfoS("Starting Kubernetes Scheduler version", "version", version.Get())

	// Configz registration.
	// 1)配置Configz
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}

	// Prepare the event broadcaster.
	// 2)准备事件广播管理器
	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

	// Setup healthz checks.
	// 3)启动Http Server,进行健康监控服务器监听
	var checks []healthz.HealthChecker
	if cc.ComponentConfig.LeaderElection.LeaderElect {
		checks = append(checks, cc.LeaderElection.WatchDog)
	}

	waitingForLeader := make(chan struct{})
	isLeader := func() bool {
		select {
		case _, ok := <-waitingForLeader:
			// if channel is closed, we are leading
			return !ok
		default:
			// channel is open, we are waiting for a leader
			return false
		}
	}

	// Start up the healthz server.
	if cc.InsecureServing != nil {
		separateMetrics := cc.InsecureMetricsServing != nil
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
		if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start healthz server: %v", err)
		}
	}
	if cc.InsecureMetricsServing != nil {
		handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
		if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start metrics server: %v", err)
		}
	}
	if cc.SecureServing != nil {
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
		// TODO: handle stoppedCh returned by c.SecureServing.Serve
		if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			// fail early for secure handlers, removing the old error loop from above
			return fmt.Errorf("failed to start secure server: %v", err)
		}
	}

	// Start all informers.
	// 4)启动所有informer
	cc.InformerFactory.Start(ctx.Done())

	// Wait for all caches to sync before scheduling.
	// 等待所有的缓存同步后再进行调度
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	// 5)因为Master节点可以存在多个,选举一个作为Leader,通过LeaderElector运行命令直到完成并退出
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				close(waitingForLeader)
				// 6)调用sched.Run(),执行主调度逻辑
				sched.Run(ctx)
			},
			OnStoppedLeading: func() {
				select {
				case <-ctx.Done():
					// We were asked to terminate. Exit 0.
					klog.Info("Requested to terminate. Exiting.")
					os.Exit(0)
				default:
					// We lost the lock.
					klog.Exitf("leaderelection lost")
				}
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		// 参加选举的会持续通信
		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	close(waitingForLeader)
	// 如果没有开启领导者选举,则直接调用调度器对象的Run函数
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

调度器启动流程如下图:

在这里插入图片描述

3、调度队列

1)、调度队列

调度器启动后最终是调用Scheduler的Run()方法来开始调度Pod,代码如下:

// pkg/scheduler/scheduler.go
// Run函数开始监听和调度
func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

Run()方法中调用SchedulingQueue的Run()方法,SchedulingQueue是一个队列接口,用于存储待调度的Pod,该接口遵循类似于cache.FIFOcache.Heap这样的数据结构,要弄明白调度器是如何去调度Pod的,首先需要弄清楚这个结构:

// pkg/scheduler/internal/queue/scheduling_queue.go
// SchedulingQueue存储待调度的pod
type SchedulingQueue interface {
	framework.PodNominator
	Add(pod *v1.Pod) error
	// Activate moves the given pods to activeQ iff they're in unschedulableQ or backoffQ.
	// The passed-in pods are originally compiled from plugins that want to activate Pods,
	// by injecting the pods through a reserved CycleState struct (PodsToActivate).
	// 如果给定的pod位于unschedulableQ或backoffQ中,将其移动到activeQ
	Activate(pods map[string]*v1.Pod)
	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.
	// The podSchedulingCycle represents the current scheduling cycle number which can be
	// returned by calling SchedulingCycle().
	// 将无法调度的pod添加回调度队列
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
	// SchedulingCycle returns the current number of scheduling cycle which is
	// cached by scheduling queue. Normally, incrementing this number whenever
	// a pod is popped (e.g. called Pop()) is enough.
	SchedulingCycle() int64
	// Pop removes the head of the queue and returns it. It blocks if the
	// queue is empty and waits until a new item is added to the queue.
	Pop() (*framework.QueuedPodInfo, error)
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error
	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() []*v1.Pod
	// Close closes the SchedulingQueue so that the goroutine which is
	// waiting to pop items can exit gracefully.
	Close()
	// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
	NumUnschedulablePods() int
	// Run starts the goroutines managing the queue.
	Run()
}

SchedulingQueue是一个用于存储带调度Pod的队列接口,在构造Scheduler对象的时候可以看到调度器中是如何实现这个队列接口的:

// pkg/scheduler/internal/queue/scheduling_queue.go
// NewSchedulingQueue将优先级队列初始化为新的调度队列
func NewSchedulingQueue(
	lessFn framework.LessFunc,
	informerFactory informers.SharedInformerFactory,
	opts ...Option) SchedulingQueue {
	return NewPriorityQueue(lessFn, informerFactory, opts...)
}

// NewPriorityQueue创建PriorityQueue对象
func NewPriorityQueue(
	lessFn framework.LessFunc,
	informerFactory informers.SharedInformerFactory,
	opts ...Option,
) *PriorityQueue {
	options := defaultPriorityQueueOptions
	for _, opt := range opts {
		opt(&options)
	}

	comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}

	if options.podNominator == nil {
		options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister())
	}

	pq := &PriorityQueue{
		PodNominator:              options.podNominator,
		clock:                     options.clock,
		stop:                      make(chan struct{}),
		podInitialBackoffDuration: options.podInitialBackoffDuration,
		podMaxBackoffDuration:     options.podMaxBackoffDuration,
		activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
		unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
		moveRequestCycle:          -1,
		clusterEventMap:           options.clusterEventMap,
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
	if utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector) {
		pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
	}

	return pq
}

从上面的初始化过程可以看到,PriorityQueue实现了SchedulingQueue接口,结构体定义如下:

// pkg/scheduler/internal/queue/scheduling_queue.go
// PriorityQueue实现了调度队列SchedulingQueue
// PriorityQueue的头部元素是优先级最高的pending pod,该结构有三个子队列:
// activeQ是一个堆,包含正在考虑进行调度的pod,称为
// unschedulableQ包含已尝试并且确定为不可调度的pod
// podBackoffQ包含从unschedulableQ移出的pod,并在backoff完成后将其移到activeQ队列
type PriorityQueue struct {
	// PodNominator abstracts the operations to maintain nominated Pods.
	framework.PodNominator

	stop  chan struct{}
	clock util.Clock

	// pod initial backoff duration.
	podInitialBackoffDuration time.Duration
	// pod maximum backoff duration.
	podMaxBackoffDuration time.Duration

	lock sync.RWMutex
	cond sync.Cond

	// activeQ is heap structure that scheduler actively looks at to find pods to
	// schedule. Head of heap is the highest priority pod.
	// activeQ是堆结构,调度器会主动查看以查找要调度的pod.heap的头是优先级最高的pod
	activeQ *heap.Heap
	// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff
	// are popped from this heap before the scheduler looks at activeQ
	// podBackoffQ是按backoff Expire排序的堆.在调度器查看activeQ之前,已完成回退的pod将从该堆中弹出
	podBackoffQ *heap.Heap
	// unschedulableQ holds pods that have been tried and determined unschedulable.
	// unschedulableQ保存已经尝试过并确定不可调度的pod
	unschedulableQ *UnschedulablePodsMap
	// schedulingCycle represents sequence number of scheduling cycle and is incremented
	// when a pod is popped.
	schedulingCycle int64
	// moveRequestCycle caches the sequence number of scheduling cycle when we
	// received a move request. Unschedulable pods in and before this scheduling
	// cycle will be put back to activeQueue if we were trying to schedule them
	// when we received move request.
	moveRequestCycle int64

	clusterEventMap map[framework.ClusterEvent]sets.String

	// closed indicates that the queue is closed.
	// It is mainly used to let Pop() exit its control loop while waiting for an item.
	closed bool

	nsLister listersv1.NamespaceLister
}

kube-scheduler使用PriorityQueue优先级队列来存储待调度的Pod,PriorityQueue的头部元素是优先级最高的待调度的Pod,该结构有3个子队列:

  • activeQ(活动队列):用来存放等待调度的Pod
  • unschedulableQ(不可调度队列):当Pod不能满足被调度的条件时就会被加入到unschedulableQ,等待后续继续进行尝试调度
  • podBackoffQ(回退队列):如果调度任务反复执行失败,则会按尝试次数增加等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。对于调度失败的Pod会优先存储在podBackoffQ队列中,等待后续进行重试,podBackoffQ可以认为就是重试的队列,只是后续再调度的等待时间会越来越长

2)、活动队列

activeQ用来存放等待调度的Pod,在上面实例化优先级队列的过程中可以看到activeQ队列的初始化是通过调用heap.NewWithRecorder()函数实现的:

// pkg/scheduler/internal/heap/heap.go
// NewWithRecorder包装一个可选的metricRecorder来组成一个堆对象,就是在Heap基础上包装了metrics数据
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
	return &Heap{
		data: &data{
			items:    map[string]*heapItem{},
			queue:    []string{},
			keyFunc:  keyFn,
			lessFunc: lessFn,
		},
		metricRecorder: metricRecorder,
	}
}

// lessFunc接收两个元素,对列表进行排序时,将第一个元素放在第二个元素之前,则返回true
type lessFunc = func(item1, item2 interface{}) bool

其中的data数据结构是Golang中的一个标准的heap堆(实现了heap.Interface接口),然后Heap在data基础上增加了metricRecorder用于记录metrics数据。这里最重要的就是用于比较元素优先级的lessFunc函数的实现,在初始化优先级队列时传入了一个comp的参数,这个参数就是activeQ这个堆的lessFunc函数的实现:

// pkg/scheduler/internal/queue/scheduling_queue.go NewPriorityQueue()
	comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}

最终是调用的创建Scheduler对象的时候传入的lessFn参数:

// pkg/scheduler/factory.go create()
	lessFn := profiles[c.profiles[0].SchedulerName].QueueSortFunc()

可以看到比较元素优先级是通过调度框架的QueueSortFunc()方法来实现的,对应的实现代码如下:

// pkg/scheduler/framework/runtime/framework.go
// QueueSortFunc返回对调度队列中的pod进行排序的函数
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
	if f == nil {
		// If frameworkImpl is nil, simply keep their order unchanged.
		// NOTE: this is primarily for tests.
		return func(_, _ *framework.QueuedPodInfo) bool { return false }
	}

	if len(f.queueSortPlugins) == 0 {
		panic("No QueueSort plugin is registered in the frameworkImpl.")
	}

	// Only one QueueSort plugin can be enabled.
	// 只能启用一个QueueSort插件
	return f.queueSortPlugins[0].Less
}

最终真正用于优先级队列元素优先级比较的函数是通过QueueSort插件来实现的,默认启用的QueueSort插件是PrioritySort,PrioritySort插件的实现核心就是Less()方法:

// pkg/scheduler/framework/plugins/queuesort/priority_sort.go
// Less是activeQ用于对pod进行排序的函数,它根据pod的优先级对pod进行排序
// 当优先级相同时,它使用PodQueueInfo.timestamp进行比较
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
	p1 := corev1helpers.PodPriority(pInfo1.Pod)
	p2 := corev1helpers.PodPriority(pInfo2.Pod)
	return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}
// k8s.io/component-helpers/scheduling/corev1/helpers.go
func PodPriority(pod *v1.Pod) int32 {
	if pod.Spec.Priority != nil {
		return *pod.Spec.Priority
	}
	// When priority of a running pod is nil, it means it was created at a time
	// that there was no global default priority class and the priority class
	// name of the pod was empty. So, we resolve to the static default priority.
	return 0
}

activeQ活动队列中的Pod是依靠PrioritySort插件来进行优先级比较的,每个Pod在被创建后都会有一个priority属性来标记Pod的优先级,也可以通过全局的ProrityClass对象来进行定义,然后在调度Pod的时候会先根据Pod优先级的高低进行比较,如果优先级相同,则会根据Pod的创建时间进行比较,越高优先级的Pod越被优先调度,越早创建的Pod越被优先调度

那么Pod是在什么时候加入到activeQ活动队列的呢?

在创建Scheduler对象时有调用addAllEventHandlers()方法,其中就有对未调度Pod的时间监听处理操作

// pkg/scheduler/eventhandlers.go
func addAllEventHandlers(
	sched *Scheduler,
	informerFactory informers.SharedInformerFactory,
	dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
	gvkMap map[framework.GVK]framework.ActionType,
) {
	...
	// unscheduled pod queue
	informerFactory.Core().V1().Pods().Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
					return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
					}
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
					return false
				default:
					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
					return false
				}
			},
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToSchedulingQueue,
				UpdateFunc: sched.updatePodInSchedulingQueue,
				DeleteFunc: sched.deletePodFromSchedulingQueue,
			},
		},
	)
  ...
}   

当Pod有事件变化后,先通过FilterFunc函数进行过滤,如果Pod没有绑定到节点(未调度)并且使用的是指定的调度器才能进入下面的Handler函数进行处理,比如当创建Pod之后就会有onAdd的添加事件,这里调用的就是sched.addPodToSchedulingQueue函数

// pkg/scheduler/eventhandlers.go
// 添加未调度的pod到优先级队列
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
	pod := obj.(*v1.Pod)
	klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod))
	if err := sched.SchedulingQueue.Add(pod); err != nil {
		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
	}
}

addPodToSchedulingQueue()方法中调用调度队列SchedulingQueue的Add()方法添加到优先级队列中:

// pkg/scheduler/internal/queue/scheduling_queue.go
// Add添加pod到activeQ活动队列,仅当添加了新的pod时才应该调用它
// 这样pod就会不会已经处理active/unschedulable/backoff队列中
func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pInfo := p.newQueuedPodInfo(pod)
	// 添加到activeQ队列中
	if err := p.activeQ.Add(pInfo); err != nil {
		klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
		return err
	}
	// 如果在unschedulableQ队列中,则从该队列移除
	if p.unschedulableQ.get(pod) != nil {
		klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))
		p.unschedulableQ.delete(pod)
	}
	// Delete pod from backoffQ if it is backing off
	// 从podBackoffQ队列中删除
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
		klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))
	}
	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
	p.PodNominator.AddNominatedPod(pInfo.PodInfo, "")
	// 通知其他地方进行处理
	p.cond.Broadcast()

	return nil
}

这就是activeQ活动队列添加元素的过程

参考:

这8张图终于把K8S调度器讲通透了!

调度器的工作原理

调度器的启动流程

调度Pod流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/9244.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

thanos prometheus 的高可用、长期存储二进制部署

1.简介 http://thanos.io/ thanos 是具有长期存储功能的开源、高可用性 Prometheus的集群组件。 全局查询视图 跨多个 Prometheus 服务器和集群查询指标 无限保留 使用对象存储扩展系统&#xff0c;不限时间保留指标。 Prometheus兼容 兼容 Prometheus api&#xff0c;用于…

FPGA时序知识点(基本方法总结就两点:1.降低时钟频率2.减小组合逻辑延迟(针对Setup Slack公式来的)

1.我们说的所有时序分析都是建立在同步电路的基础上的&#xff0c;异步电路不能做时序分析&#xff08;或者说只能做伪路径约束&#xff08;在设伪路径之前单bit就打拍&#xff0c;多bit就异步fifo拉到目的时钟域来&#xff09;&#xff09;。——FPGA 设计中寄存器全部使用一个…

Spring的事务

(1) 事务的定义 事务就是用户定义的一系列数据库操作&#xff0c;这些操作可以视为一个完成的逻辑处理工作单元&#xff0c;要么全部执行&#xff0c;要么全部不执行&#xff0c;是不可分割的工作单元。 (2)事务的使用&#xff1a; begin transaction commit rollback. begin …

谈谈软件系统重构

「头条关注【Java思享汇】&#xff0c;面试、各种技术栈、架构设计持续更新中&#xff5e;」 分享初衷&#xff1a;工作几年之后基本都会经历过大大小小的系统重构&#xff0c;笔者经历过单体应用拆分微服务的系统重构&#xff0c;数据异构&#xff0c;业务系统重构。借助此次…

总结819

学习目标&#xff1a; 4月&#xff08;复习完高数18讲内容&#xff0c;背诵21篇短文&#xff0c;熟词僻义300词基础词&#xff09; 第二周&#xff1a; 学习内容&#xff1a; 暴力英语&#xff1a;早上背诵《think different》记150词&#xff0c;默写了两篇文章&#xff0c…

Java中的Iterator底层原理实现

两个抽象方法 Iterator主要有两个抽象方法&#xff0c;让子类实现。 hasNext()用来判断还有没有数据可供访问。next()方法用于访问集合的下一个数据。 这两个方法不像List的get()那样依赖索引获取数据&#xff0c;也不像Queue的poll方法那样依赖特定规则获取数据。 迭代器的…

3月更新 | Visual Studio Code Python

我们很高兴地宣布&#xff0c;2023年3月版 Visual Studio Code Python 和 Jupyter 扩展现已推出&#xff01; 此版本包括以下改进&#xff1a; 后退按钮和取消功能添加到创建环境命令默认情况下&#xff0c;Python 扩展不再附带 isortJupyter 笔记本中内核选择的改进Python P…

代码随想录Day49

今天继续学习动规解决完全背包问题。 322.零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;…

java 线段树

线段树是一种二叉搜索树&#xff0c;什么叫做二叉搜索树&#xff0c;首先满足二叉树&#xff0c;每个结点度小于等于二&#xff0c;即每个结点最多有两颗子树&#xff0c;何为搜索&#xff0c;我们要知道&#xff0c;线段树的每个结点都存储了一个区间&#xff0c;也可以理解成…

【JavaWeb】8—过滤器

⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记链接&#x1f449;https://github.com/A-BigTree/Code_Learning ⭐⭐⭐⭐⭐⭐ 如果可以&#xff0c;麻烦各位看官顺手点个star~&#x1f60a; 如果文章对你有所帮助&#xff0c;可以点赞&#x1f44d;…

C语言中宏的一些高级用法举例

C语言中宏的一些高级用法 文章目录C语言中宏的一些高级用法1.字符串化2.标记的拼接3.宏的嵌套替换多条语句防止头文件被重复包含宏的可变参数应用方式1方式2方式34.常用宏宏和函数的区别1.字符串化 #include <stdio.h> #include <stdbool.h> #include <string.…

测试开发常问面试题

Postman Postman实现接口关联 步骤 通过正则表达式或则JSON提取器取值的方式&#xff0c;提取需要的参数。将参数设置为全局变量或则环境变量。在之后接口中&#xff0c;通过{{全局变量/环境变量}}代替要替换的参数值。 - JSON提取器方式 var jsonData JSON.parse(respons…

【Spring6】数据校验:Validation

10、数据校验&#xff1a;Validation 10.1、Spring Validation概述 在开发中&#xff0c;我们经常遇到参数校验的需求&#xff0c;比如用户注册的时候&#xff0c;要校验用户名不能为空、用户名长度不超过20个字符、手机号是合法的手机号格式等等。如果使用普通方式&#xff0c…

TenserRT(三)PYTORCH 转 ONNX 详解

第三章&#xff1a;PyTorch 转 ONNX 详解 — mmdeploy 0.12.0 文档 torch.onnx — PyTorch 2.0 documentation torch.onnx.export 细解 计算图导出方法 TorchScript是一种序列化和优化PyTorch模型的格式&#xff0c;将torch.nn.Module模型转换为TorchScript的torch.jit.Scr…

unicloud 模糊查询解决方案

序 1、where和aggregate的模糊搜索 2、第一种是“你好”去匹配“你好啊大家” 3、第二种是“家啊”去匹配“啊&#xff01;你家呢” 只要有1个字匹配就匹配 4、第三种是“家啊”去匹配“啊&#xff01;你家呢” 必须有“家”又有“啊”才匹配” 想看效果&#xff0c;大家可以自…

ROBOGUIDE教程:FANUC机器人摆焊焊接功能介绍与虚拟仿真操作方法

目录 摆焊功能简介 摆焊指令介绍 摆焊功能设置 摆焊条件设置 机器人摆焊示教编程 仿真运行 摆焊功能简介 使用FANCU机器人进行弧焊焊接时&#xff0c;也可以实现摆动焊接&#xff08;简称摆焊&#xff09;。 摆焊功能是在机器人弧焊焊接时&#xff0c;焊枪面对焊接方向…

面试字节,三面HR天坑,想不到自己也会阴沟里翻船....

阎王易见&#xff0c;小鬼难缠。我一直相信这个世界上好人居多&#xff0c;但是也没想到自己也会在阴沟里翻船。我感觉自己被字节跳动的HR坑了。 在这里&#xff0c;我只想告诫大家&#xff0c;offer一定要拿到自己的手里才是真的&#xff0c;口头offer都是不牢靠的&#xff0…

【CE】Mac下的CE教程Tutorial:进阶篇(第8关:多级指针)

▒ 目录 ▒&#x1f6eb; 导读开发环境1️⃣ 第8关&#xff1a;多级指针翻译操作验证其它方案&#x1f6ec; 文章小结&#x1f4d6; 参考资料&#x1f6eb; 导读 开发环境 版本号描述文章日期2023-03-操作系统MacOS Big Sur 11.5Cheat Engine7.4.3 1️⃣ 第8关&#xff1a;多…

MySQL数据库中的函数怎样使用?

函数 是指一段可以直接被另一段程序调用的程序或代码。 也就意味着&#xff0c;这一段程序或代码在MySQL中已经给我们提供了&#xff0c;我们要做的就是在合适的业务场景调用对应的函数完成对应的业务需求即可。 那么&#xff0c;函数到底在哪儿使用呢? 我们先来看两个场景&a…

【FPGA-Spirit_V2】基于FPGA的循迹小车-小精灵V2开发板

&#x1f389;欢迎来到FPGA专栏~基于FPGA的循迹小车 ☆* o(≧▽≦)o *☆嗨~我是小夏与酒&#x1f379; ✨博客主页&#xff1a;小夏与酒的博客 &#x1f388;该系列文章专栏&#xff1a;FPGA学习之旅 文章作者技术和水平有限&#xff0c;如果文中出现错误&#xff0c;希望大家能…
最新文章