路漫漫其修远兮
吾将上下而求索

kube-scheduler:调度器启动流程

kube-scheduler 组件有很多可以配置的启动参数,其核心也是通过 cobra 开发的一个 CLI 工具,所以要掌握 kube-scheduler 的启动配置,需要我们对 cobra 有一个基本的了解,kube-scheduler 主要有两种类型的配置参数:

  • 调度策略相关的参数,例如启用那些调度插件,以及给某些调度插件配置一些参数

  • 通用参数,就是一些普通的参数,比如配置服务端口号等等

这里我们主要是了解调度器的核心调度框架和算法,所以主要关注第一种参数即可。

参数配置

kube-scheduler 的启动入口位于 cmd/kube-scheduler/scheduler.go 文件,该文件中就包含一个 main 入口函数:

// cmd/kube-scheduler/scheduler.go
func main() {
	rand.Seed(time.Now().UnixNano())
	
	// 初始化 Cobra.Command 对象
	command := app.NewSchedulerCommand()
	
  // 将命令行参数进行标准化(_替换成-)
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
  // 初始化日志
	logs.InitLogs()
	defer logs.FlushLogs()
	
  // 执行命令
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

其中最核心的就是通过 app.NewSchedulerCommand() 或者一个 Cobra 的 Command 对象,然后最下面调用 command.Execute() 函数执行这个命令,所以核心就是 NewSchedulerCommand 函数的实现:

// cmd/kube-scheduler/app/server.go

// Option 配置一个 framework.Registry
type Option func(runtime.Registry) error

// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	// 获取默认的配置信息
  opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `......`,
    // 真正执行的函数入口
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, opts, registryOptions...); err != nil {
				fmt.Fprintf(os.Stderr, "%v\\n", err)
				os.Exit(1)
			}
		},
		......
	}
	......
	return cmd
}

如果我们熟悉 Cobra 的基本用法的话应该知道当我们执行 Cobra 的命令的时候,实际上真正执行的是 Cobra.Command 对象中的 Run 函数,也就是这里的 runCommand 函数:

// cmd/kube-scheduler/app/server.go
if err := runCommand(cmd, opts, registryOptions...); err != nil {
	fmt.Fprintf(os.Stderr, "%v\\n", err)
	os.Exit(1)
}

其中有两个非常重要的参数 opts 与 registryOptions,opts 是一个 Options 对象,该参数包含所有的运行 Scheduler 需要的参数:

// cmd/kube-scheduler/app/options/options.go

// Options 拥有所有运行 Scheduler 需要的参数
type Options struct {
  // 默认值,如果设置了 ConfigFile 或 InsecureServing 中的值,这些设置将被覆盖
  // KubeSchedulerConfiguration 类似与 Deployment 都是k8s的资源对象,这是这个对象是用于配置调度器使用的
	ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration

	SecureServing           *apiserveroptions.SecureServingOptionsWithLoopback
  // 可为 Healthz 和 metrics 配置两个不安全的标志
  CombinedInsecureServing *CombinedInsecureServingOptions
	Authentication          *apiserveroptions.DelegatingAuthenticationOptions
	Authorization           *apiserveroptions.DelegatingAuthorizationOptions
	Metrics                 *metrics.Options
	Logs                    *logs.Options
	Deprecated              *DeprecatedOptions

	// ConfigFile 指定是调度程序服务的配置文件的位置
	ConfigFile string

  // WriteConfigTo 将默认配置写入的文件路径
	WriteConfigTo string

	Master string
}

其中第一个参数 ComponentConfig kubeschedulerconfig.KubeSchedulerConfiguration 是我们需要重点关注的用于配置调度策略相关参数的地方,通过 NewOptions() 来获取默认配置参数:

// cmd/kube-scheduler/app/options/options.go

// NewOptions 返回一个默认的调度器应用 options 参数。
func NewOptions() (*Options, error) {
	cfg, err := newDefaultComponentConfig()
	if err != nil {
		return nil, err
	}
	......
	o := &Options{
		ComponentConfig: *cfg,
		......
	}
	......
	return o, nil
}

上面是初始化 Options 的函数,这里我们只关心核心的 ComponentConfig 参数,该参数是通过函数 newDefaultComponentConfig() 来生成默认的配置:

// cmd/kube-scheduler/app/options/options.go

func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
	versionedCfg := kubeschedulerconfigv1beta1.KubeSchedulerConfiguration{}
	// 可用于配置是否开启 Debug 相关特性,比如 profiling
  versionedCfg.DebuggingConfiguration = *configv1alpha1.NewRecommendedDebuggingConfiguration()

	kubeschedulerscheme.Scheme.Default(&versionedCfg)
	cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
	if err := kubeschedulerscheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
		return nil, err
	}
	return &cfg, nil
}

上面的函数会创建一个默认的 KubeSchedulerConfiguration 对象,用于配置调度器,默认配置参数通过 Options 构造完成后,在构造整个 cobra.Command 命令后会为其添加命令行参数:

// cmd/kube-scheduler/app/server.go

// NewSchedulerCommand 使用默认参数和 registryOptions 创建一个 *cobra.Command 对象
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	// 获取默认的配置信息,选举信息,调度名字都是从这里拿的
  opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		......
	}
	fs := cmd.Flags()
  // 调用 Options 的 Flags 方法
	namedFlagSets := opts.Flags()
	verflag.AddFlags(namedFlagSets.FlagSet("global"))
	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())
	// 将默认的所有参数添加到 cmd.Flags 中去
  for _, f := range namedFlagSets.FlagSets {
		fs.AddFlagSet(f)
	}
	......
	return cmd
}

其中的 opts.Flags() 方法就是将默认的 Options 配置转换成命令行参数的函数:

// cmd/kube-scheduler/app/options/options.go

func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
	fs := nfs.FlagSet("misc")
	fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, `The path to the configuration file. The following flags can overwrite fields in this file:
  --address
  --port
  --use-legacy-policy-config
  --policy-configmap
  --policy-config-file
  --algorithm-provider`)
	fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
	fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

	o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
	o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
	o.Authentication.AddFlags(nfs.FlagSet("authentication"))
	o.Authorization.AddFlags(nfs.FlagSet("authorization"))
	o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

	options.BindLeaderElectionFlags(&o.ComponentConfig.LeaderElection, nfs.FlagSet("leader election"))
	utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))
	o.Metrics.AddFlags(nfs.FlagSet("metrics"))
	o.Logs.AddFlags(nfs.FlagSet("logs"))

	return nfs
}

其中第一个参数 --config 就可以用来指定配置文件。到这里我们就获取到了调度器所有默认的配置参数了。

启动调度器

接下来分析真正运行调度器的 runCommand 函数的实现。

// cmd/kube-scheduler/app/server.go

// 运行调度器的真正函数
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	// 比如执行 --version 这样的操作,则打印版本后直接退出了
  verflag.PrintAndExitIfRequested()
	cliflag.PrintFlags(cmd.Flags())

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// 根据命令行 args 和 options 创建完整的配置和调度程序
	cc, sched, err := Setup(ctx, opts, registryOptions...)
	if err != nil {
		return err
	}
	
  // 如果指定了 WriteConfigTo 参数
	if len(opts.WriteConfigTo) > 0 {
    // 将配置写入到指定的文件中
		if err := options.WriteConfigFile(opts.WriteConfigTo, &cc.ComponentConfig); err != nil {
			return err
		}
		klog.Infof("Wrote configuration to: %s\\n", opts.WriteConfigTo)
		return nil
	}
  // 真正去启动调度器
	return Run(ctx, cc, sched)
}

上面的函数首先判断是否是执行类似于 --version 这样的操作,如果是这打印后直接退出,然后根据命令行参数和选项通过 Setup 函数构造 CompletedConfig 配置和 Scheduler 调度器对象。

// cmd/kube-scheduler/app/server.go/

// 根据命令行参数和选项构造完整的配置和调度器对象
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
	// 校验命令行选项
  if errs := opts.Validate(); len(errs) > 0 {
		return nil, nil, utilerrors.NewAggregate(errs)
	}
	// 获取调度器Config对象,该对象拥有一个调度器所有的上下文信息
	c, err := opts.Config()
	if err != nil {
		return nil, nil, err
	}

	// 获取 completed 配置
	cc := c.Complete()

	// outOfTreeRegistry是个map,里面有注册的方法,key为sample,value为Sample插件,当然这里可以有多个插件
	outOfTreeRegistry := make(runtime.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return nil, nil, err
		}
	}
	// 到这里 outOfTreeRegistry 是个 map,里面有注册的方法,key为sample,value为Sample插件,当然这里可以有多个插件

	recorderFactory := getRecorderFactory(&cc)
	// 创建调度器
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		// 配置里面的的插件使能列表放进去
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
	)
	if err != nil {
		return nil, nil, err
	}

	return &cc, sched, nil
}

该函数首先调用 opts.Validate() 函数对所有参数进行校验,接着使用 opts.Config() 函数创建 *schedulerappconfig.Config 对象,该对象拥有一个调度器所有的上下文信息。

// cmd/kube-scheduler/app/options/options.go

// Config 返回一个调度器配置对象
func (o *Options) Config() (*schedulerappconfig.Config, error) {
	......
	// 从这里读取配置文件,将其实例化为对象,包括用户使能哪些自定义插件,关闭哪些自定义插件等,名字等
	c := &schedulerappconfig.Config{}
	if err := o.ApplyTo(c); err != nil {
		return nil, err
	}

	// 创建 kube 客户端
	client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
	if err != nil {
		return nil, err
	}

	......
	c.Client = client
	c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
	c.PodInformer = scheduler.NewPodInformer(client, 0)
	c.LeaderElection = leaderElectionConfig

	return c, nil
}

上面函数的核心是通过 o.ApplyTo(c) 函数将 Options 转换成了 *schedulerappconfig.Config 对象,

// cmd/kube-scheduler/app/options/options.go

// 将调度程序 options 转换成调度程序应用配置
func (o *Options) ApplyTo(c *schedulerappconfig.Config) error {
	if len(o.ConfigFile) == 0 {
		c.ComponentConfig = o.ComponentConfig

		// 如果未加载任何配置文件,则应用 deprecated flags(这是旧的行为)
    o.Deprecated.ApplyTo(&c.ComponentConfig)
		if err := o.CombinedInsecureServing.ApplyTo(c, &c.ComponentConfig); err != nil {
			return err
		}
	} else {
		cfg, err := loadConfigFromFile(o.ConfigFile)
		if err != nil {
			return err
		}
		if err := validation.ValidateKubeSchedulerConfiguration(cfg).ToAggregate(); err != nil {
			return err
		}
		c.ComponentConfig = *cfg
		......
	}
	......
	return nil
}

上面的转换函数会首先判断是否配置了 ConfigFile(也就是 --config 参数),如果配置了则会加载对应的配置文件转换成对应的 KubeSchedulerConfiguration 对象,然后校验有效性,如果都正常则将其赋给  schedulerappconfig.Config 的 ComponentConfig 属性;如果没有配置 ConfigFile,则使用旧的参数进行配置。

接着会去调用 scheduler.New() 函数去构造一个真正的调度器对象,该函数的具体实现如下所示:

// pkg/scheduler/scheduler.go

// 配置调度器
type Option func(*schedulerOptions)

var defaultSchedulerOptions = schedulerOptions{
	profiles: []schedulerapi.KubeSchedulerProfile{
    // Profiles 的默认插件是从算法提供程序配置的
		{SchedulerName: v1.DefaultSchedulerName},  // 默认的调度器名称为 default-scheduler
	},
	schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{
		Provider: defaultAlgorithmSourceProviderName(),  // 默认的算法源提供器名称为 DefaultProvider
	},
	percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
	podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
	podMaxBackoffSeconds:     int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
}

// 返回一个 Scheduler 对象
func New(client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	podInformer coreinformers.PodInformer,
	recorderFactory profile.RecorderFactory,
	stopCh <-chan struct{},
	opts ...Option) (*Scheduler, error) {

	stopEverything := stopCh
	if stopEverything == nil {
		stopEverything = wait.NeverStop
	}
	
	// 默认的调度器配置
	options := defaultSchedulerOptions
	for _, opt := range opts {
    // 将默认的调度器配置调用 Option 重新配置一次
		opt(&options)
	}

	schedulerCache := internalcache.New(30*time.Second, stopEverything)
	volumeBinder := scheduling.NewVolumeBinder(
		client,
		informerFactory.Core().V1().Nodes(),
		informerFactory.Storage().V1().CSINodes(),
		informerFactory.Core().V1().PersistentVolumeClaims(),
		informerFactory.Core().V1().PersistentVolumes(),
		informerFactory.Storage().V1().StorageClasses(),
		time.Duration(options.bindTimeoutSeconds)*time.Second,
	)

	// 到这里,options.frameworkOutOfTreeRegistry是一个map,里面的key为用户自定义的插件名称sample,value为用户自定义的插件对象
	registry := frameworkplugins.NewInTreeRegistry()
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}
	// 用户自定义的插件被追加到了(已经有默认调度对象)后面,每个对象都有prefilter,filter,prescore等方法,是否开启哪个,通过enable来决定
	
	
	configurator := &Configurator{
		client:                   client,
		recorderFactory:          recorderFactory,
		informerFactory:          informerFactory,
		podInformer:              podInformer,
		volumeBinder:             volumeBinder,
		schedulerCache:           schedulerCache,
		StopEverything:           stopEverything,
		disablePreemption:        options.disablePreemption,
		percentageOfNodesToScore: options.percentageOfNodesToScore,
		bindTimeoutSeconds:       options.bindTimeoutSeconds,
		podInitialBackoffSeconds: options.podInitialBackoffSeconds,
		podMaxBackoffSeconds:     options.podMaxBackoffSeconds,
		enableNonPreempting:      utilfeature.DefaultFeatureGate.Enabled(kubefeatures.NonPreemptingPriority),
		profiles:                 append([]schedulerapi.KubeSchedulerProfile(nil), options.profiles...),  
		// 这里是最开始插件初始化,一个对象列表,options.profiles如果是默认,就是默认调度器名字,如果是自定义,则是自定义调度器名字
		// 不管怎样,都只有一个元素
		registry:                 registry,
		nodeInfoSnapshot:         snapshot,
		extenders:                options.extenders,
	}
	......

	var sched *Scheduler
  // SchedulerAlgorithmSource 是调度程序算法的源
  // 包含Policy与Provider两种方式,必须指定一个源字段,并且源字段是互斥的
	source := options.schedulerAlgorithmSource
	switch {
	case source.Provider != nil:
		// 从一个算法 provider 中创建配置,这是我们现在需要重点关注的方式
		sc, err := configurator.createFromProvider(*source.Provider)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
		}
		sched = sc
	case source.Policy != nil:
		// 从用户指定的策略源中创建配置,这是以前的扩展方式
		policy := &schedulerapi.Policy{}
		......
	default:
		return nil, fmt.Errorf("unsupported algorithm source: %v", source)
	}
	......
  // addAllEventHandlers 是在测试和 Scheduler 中使用的帮助程序函数,用于为各种 informers 添加事件处理程序
	addAllEventHandlers(sched, informerFactory, podInformer)
	return sched, nil
}

首先将默认的调度器配置通过传递的 Option 参数进行一一配置,然后重点就是根据应用过后的配置来判断调度算法的源是 Provider 还是 Policy 方式,我们现在的重点是调度框架,所以主要关注 Provider 这种配置,Policy 是以前的扩展调度器的方式。所以调度器的实例化核心是通过 configurator.createFromProvider(*source.Provider) 该函数来实现的。

// pkg/scheduler/factory.go

// 从一组已注册的插件集合中创建一个调度器
func (c *Configurator) create() (*Scheduler, error) {
	var extenders []framework.Extender
	var ignoredExtendedResources []string
	if len(c.extenders) != 0 {
		// Extender 方式扩展调度器,这里为0,跳过
    ......
	}

  ......
  
  
    // register和plugin之间关系对应处理,重点是buildFramework方法
	profiles, err := profile.NewMap(c.profiles, c.buildFramework, c.recorderFactory)
	if err != nil {
		return nil, fmt.Errorf("initializing profiles: %v", err)
	}
	if len(profiles) == 0 {
		return nil, errors.New("at least one profile is required")
	}

	// Profiles 需要提供有效的 queue sort 插件
	lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
  // 将优先级队列初始化为调度队列
	podQueue := internalqueue.NewSchedulingQueue(
		lessFn,
		internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
		internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
		internalqueue.WithPodNominator(nominator),
	)

  ......
  // 创建一个 genericScheduler 对象,该对象实现了 ScheduleAlgorithm 接口,具体的调度实现就是这个对象实现的
	algo := core.NewGenericScheduler(
		c.schedulerCache,
		c.nodeInfoSnapshot,
		extenders,
		c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
		c.disablePreemption,
		c.percentageOfNodesToScore,
	)

	return &Scheduler{
		SchedulerCache:  c.schedulerCache,
		Algorithm:       algo,
		Profiles:        profiles,
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
		StopEverything:  c.StopEverything,
		SchedulingQueue: podQueue,
	}, nil
}

// createFromProvider 从注册的算法提供器来创建调度器
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
	klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
        
    // 获取算法提供器集合,里面有各种算法的是否开启的默认设置
	r := algorithmprovider.NewRegistry()
	// 方法返回为一个字典,r{"DefaultProvider": 插件使能列表schedulerapi.Plugins},具体的格式见下面图示
        
    // 获取指定算法提供器的插件集合,这里providerName 为 schedulerapi.SchedulerDefaultProviderName
	defaultPlugins, exist := r[providerName]
	if !exist {
		return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
	}
  
    //  重要:自定义调度器和默认的调度器下面执行流程是一样的,自定义调度器和默认调度器都可以在配置文件中enable和disable某个插件
    //  Profiles里只有一个元素,默认调度器只有默认调度器那个名字。自定义调度器只有自定义调度器那个名字
    //  prof.Plugins里面是从配置文件中拿的自定义配置。自定义调度器和默认调度器都可以配置哪个选项是否启用
    //  使用程序默认的调度插件,在此基础之上,加上了配置文件里的自定义的调度插件
	for i := range c.profiles {
		prof := &c.profiles[i]
		plugins := &schedulerapi.Plugins{}
		plugins.Append(defaultPlugins)
                
    // Apply 合并来自自定义插件的插件配置,哪些启用,哪些不启用。如果enable和disable有同一个插件,则不启用
		plugins.Apply(prof.Plugins)
		prof.Plugins = plugins
	}
	return c.create()
}

通过上面的一些列操作后就实例化了真正的调度器对象,最后我们需要去启动一系列的资源对象的事件监听程序,比如 Pod、Node 等对象,上面实例化函数中通过 addAllEventHandlers(sched, informerFactory, podInformer) 来实现的,关于这些资源对象对应的 onAdd、onUpdate、onDelete 操作均在 pkg/scheduler/eventhandlers.go 文件中实现的,这样比如当创建一个 Pod 过后,我们的调度器通过 watch 就会在 onAdd 事件中接收到该操作,然后我们就可以根据 queue sort 插件将器加入到带调度队列中去开始调度了。

最后就是去调用 Run 函数来真正启动调度器了,首先会等待所有的 cache 同步完成,然后开始进行调度操作。

// cmd/kube-scheduler/app/server.go/

// Run 根据指定的配置执行调度程序,仅在出现错误或上下文完成时才返回
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

	......

	// 启动 healthz 以及 metrics 相关服务
	......

	// 启动所有 informers
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())

	// 调度之前等待所有 caches 同步完成
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// 开启了 leader election
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// 如果没有开启 Leader election,这直接调用调度器对象的 Run 函数
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

接下来我们就接着来分析是如何进行具体的 Pod 调度的。

image.png

未经允许不得转载:江哥架构师笔记 » kube-scheduler:调度器启动流程

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址