路漫漫其修远兮
吾将上下而求索

kube-scheduler:调度插件执行原理

前面我们是在整体上对 Pod 的调度流程进行了分析,但是真正核心的调度操作才是我们最需要关心的,也就是上面提到的 sched.Algorithm.Schedule() 函数的实现。

这里需要关注 Scheduler 下面的 Algorithm 属性,该属性是在初始化调度器的时候传入的:

// pkg/scheduler/scheduler.go
type Scheduler struct {
	Algorithm core.ScheduleAlgorithm
  ......
}

// pkg/scheduler/factory.go

// 初始化 Scheduler
func (c *Configurator) create() (*Scheduler, error) {
	......
	algo := core.NewGenericScheduler(
		c.schedulerCache,
		c.nodeInfoSnapshot,
		extenders,
		c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
		c.disablePreemption,
		c.percentageOfNodesToScore,
	)

	return &Scheduler{
		Algorithm:       algo,
		......
	}, nil
}

// pkg/scheduler/core/generic_scheduler.go

// ScheduleAlgorithm 是一个知道如何将 Pod 调度到节点上去的接口
type ScheduleAlgorithm interface {
	Schedule(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error)
	Extenders() []framework.Extender
}

// NewGenericScheduler 创建一个 genericScheduler 对象
func NewGenericScheduler(
	cache internalcache.Cache,
	nodeInfoSnapshot *internalcache.Snapshot,
	extenders []framework.Extender,
	pvcLister corelisters.PersistentVolumeClaimLister,
	disablePreemption bool,
	percentageOfNodesToScore int32) ScheduleAlgorithm {
	return &genericScheduler{
		cache:                    cache,
		extenders:                extenders,
		nodeInfoSnapshot:         nodeInfoSnapshot,
		pvcLister:                pvcLister,
		disablePreemption:        disablePreemption,
		percentageOfNodesToScore: percentageOfNodesToScore,
	}
}

从定义上来看可以知道 Algorithm 是一个 ScheduleAlgorithm 接口,在初始化的时候我们使用的  core.NewGenericScheduler() 来初始化 Algorithm,证明这个函数返回的 genericScheduler 对象一定会实现 ScheduleAlgorithm 接口,所以我们在 scheduleOne 函数里面真正去调度的时候调用的 sched.Algorithm.Schedule() 函数是 genericScheduler 中的 Schedule() 方法。

Schedule

下面我们来分析下 genericScheduler 中的 Schedule() 函数的实现:

// pkg/scheduler/core/generic_scheduler.go

// Schedule 尝试将指定的 Pod 调度到一系列节点中的一个节点上去。
// 如果调度成功,将返回该节点名称
// 如果调度失败,将返回一个带有失败原因的 FitError 
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  // 检查最基本的条件
	if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
		return result, err
	}
  
  // 完成调度器缓存和节点信息的快照
	if err := g.snapshot(); err != nil {
		return result, err
	}
 
  // 判断当前快照中的节点数是否为0
	if g.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}
  
  // 预选,先找到一些符合基本条件的节点
  // state作用是传递上下文变量
	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
	if err != nil {
		return result, err
	}
  // 没有找到合适的
	if len(feasibleNodes) == 0 {
		return result, &FitError{
			Pod:                   pod,
			NumAllNodes:           g.nodeInfoSnapshot.NumNodes(),
			FilteredNodesStatuses: filteredNodesStatuses,
		}
	}
  
  // 如果预选过后只有1个节点,那么就直接返回这个节点信息就行了
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(filteredNodesStatuses),
			FeasibleNodes:  1,
		}, nil
	}
  
  // 如果不止1个节点,那么就需要进行优选,给每个节点进行打分
	priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

  // 选择分数最高的作为最终的节点
	host, err := g.selectHost(priorityList)
	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}

整个核心调度的实现流程很简单:

  1. 进行一些最基本的调度检查

  2. 对调度器缓存和节点信息快照操作

  3. 首先进行预选操作,找到一批合适的待调度的节点

  4. 如果没有找到节点,返回 FitError 错误

  5. 如果只找到一个节点,则直接返回这个节点的信息

  6. 如果找到多个节点,则进行优选操作,为每个节点进行打分,选择一个分数最高的作为待调度的节点进行返回

这里我们重点关注的是预选优选两个阶段的实现

预选

预选阶段调用 g.findNodesThatFitPod() 函数来获取一批合适的带调度的节点。函数实现如下所示:

// pkg/scheduler/core/generic_scheduler.go

// 根据框架的过滤插件和过滤扩展器对节点进行过滤,找到适合 Pod 的节点
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) {
	filteredNodesStatuses := make(framework.NodeToStatusMap)

	// 运行 "prefilter" 插件
	s := prof.RunPreFilterPlugins(ctx, state, pod)
	if !s.IsSuccess() {
		if !s.IsUnschedulable() {
			return nil, nil, s.AsError()
		}
		// 更新节点的状态
		allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
		if err != nil {
			return nil, nil, err
		}
		for _, n := range allNodes {
			filteredNodesStatuses[n.Node().Name] = s
		}
		return nil, filteredNodesStatuses, nil
	}
  // 通过 Filter 插件找到合适的节点
	feasibleNodes, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
  // 通过 Extenders 过滤合适的节点
	feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, filteredNodesStatuses)
	if err != nil {
		return nil, nil, err
	}
	return feasibleNodes, filteredNodesStatuses, nil
}

首先会运行 prefilter 插件,然后运行所有的 filter 插件,最后是如果存在 Extender,则运行 Extender 的 Filter 函数,当然 Extender 这种方式我们不关心,这里的重点仍然是调度框架的使用。

其中调用 prof.RunPreFilterPlugins() 执行所有 prefilter 插件的 PreFilter 函数,需要所有的插件都执行成功才算成功:

// pkg/scheduler/framework/runtime/framework.go

// RunPreFilterPlugins 运行配置的所有 PreFilter 插件
// 它返回 *Status,如果任何一个插件返回不是 Success,则就会终止调度周期
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
	// 循环运行所有配置的 prefilter 插件
	for _, pl := range f.preFilterPlugins {
    // 执行一个具体的 prefilter 插件
		status = f.runPreFilterPlugin(ctx, pl, state, pod)
		if !status.IsSuccess() {
			if status.IsUnschedulable() {
				return status
			}
			msg := fmt.Sprintf("prefilter plugin %q failed for pod %q: %v", pl.Name(), pod.Name, status.Message())
			klog.Error(msg)
			return framework.NewStatus(framework.Error, msg)
		}
	}
	return nil
}

func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) *framework.Status {
	if !state.ShouldRecordPluginMetrics() {
		return pl.PreFilter(ctx, state, pod)
	}
	startTime := time.Now()
  // 调用插件的 PreFilter 函数
	status := pl.PreFilter(ctx, state, pod)
	f.metricsRecorder.observePluginDurationAsync(preFilter, pl.Name(), status, metrics.SinceInSeconds(startTime))
	return status
}

重要:不管是自定义的扩展点,还是默认的扩展点,比如都是FilterPlugin这个阶段,每个扩展点都实现了Fileter这个接口,而filterPlugins这个数组里面,每个元素都是这类型的变量,每个变量都实现了这个接口。所以,只要调用每个元素的自己的runFilterPlugin里面的Filter方法即可。每个阶段都一样

默认情况下,调度器已经启用了一系列的 prefilter 插件,也就是在 getDefaultConfig() 函数中定义的:

// pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
	return &schedulerapi.Plugins{
		......
		PreFilter: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: noderesources.FitName},  // 检查节点是否拥有 Pod 请求的所有资源
				{Name: nodeports.Name},  // 检查 Pod 请求的端口在节点上是否可用
				{Name: podtopologyspread.Name},  // 检查 Pod 拓扑分布
				{Name: interpodaffinity.Name},  // 检查 Pod 间亲和性与反亲和性
				{Name: volumebinding.Name},  // 检查节点是否有请求的卷,或是否可以绑定请求的卷
			},
		},
		......
  }
}

默认情况下系统启用了 NodeResourcesFitNodePortsPodTopologySpreadInterPodAffinityVolumeBinding 这几个插件。

优选

经过上面的预选阶段过后得到符合调度条件的节点列表,然后会调用 prioritizeNodes 函数为每个节点进行打分,最后调用 selectHost 函数选择一个分数最高的节点作为最终调度的节点:

// 如果不止1个节点,那么就需要进行优选,给每个节点进行打分
priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)

// 选择分数最高的作为最终的节点
host, err := g.selectHost(priorityList)

为每个节点进行打分的函数实现如下所示:

// pkg/scheduler/core/generic_scheduler.go

// prioritizeNodes 通过执行 score 插件来对节点进行优先级排序,
// 这些插件从 RunScorePlugins() 的调用中返回每个节点的得分。
// 每个插件的分数加在一起,就成了该节点的分数。
// 最后将所有的分数合并(相加),得到所有节点的加权总分。
func (g *genericScheduler) prioritizeNodes(
	ctx context.Context,
	prof *profile.Profile,
	state *framework.CycleState,
	pod *v1.Pod,
	nodes []*v1.Node,
) (framework.NodeScoreList, error) {
	// 如果没有提供优先级配置,那么所有节点的分数都是 1
	......

	// 执行所有 PreScore 插件
	preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes)
	if !preScoreStatus.IsSuccess() {
		return nil, preScoreStatus.AsError()
	}

	// 执行所有 Score 插件
	scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes)
	if !scoreStatus.IsSuccess() {
		return nil, scoreStatus.AsError()
	}

	if klog.V(10).Enabled() {
		for plugin, nodeScoreList := range scoresMap {
			klog.Infof("Plugin %s scores on %v/%v => %v", plugin, pod.Namespace, pod.Name, nodeScoreList)
		}
	}

	// 总结所有分数
	result := make(framework.NodeScoreList, 0, len(nodes))
	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}

	......

	return result, nil
}

同样首先通过调用 RunPreScorePlugins 函数执行所有 PreScore 插件,然后调用 RunScorePlugins 函数执行所有的 Score 插件,最后把所有节点的分数合并得到对应节点的最终分数。

// pkg/scheduler/framework/runtime/framework.go

// RunScorePlugins 执行配置的所有 score 插件
// 它返回一个列表,为每个评分插件的名称存储响应的 NodeScoreList(s)
// 它还返回 *Status,如果任何一个插件返回非成功状态,它将被设置为 non-success。
func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) (ps framework.PluginToNodeScores, status *framework.Status) {
	......
  // 初始化插件节点分数对象
	pluginToNodeScores := make(framework.PluginToNodeScores, len(f.scorePlugins))
	for _, pl := range f.scorePlugins {
		pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
	}
	......

	// 对每个节点并行运行 Score 方法
	parallelize.Until(ctx, len(nodes), func(index int) {
		for _, pl := range f.scorePlugins {
			nodeName := nodes[index].Name
      // 调用评分插件的 Score 函数
			s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
			......
      // 为当前插件设置对应节点的分数
			pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
				Name:  nodeName,
				Score: int64(s),
			}
		}
	})
	......

	// 为每个 ScorePlugin 并行运行 NormalizeScore 方法
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
    // 得到插件对应的所有节点分数
		nodeScoreList := pluginToNodeScores[pl.Name()]
		if pl.ScoreExtensions() == nil {
			return
		}
    // 调用插件的 NormalizeScore 函数
		status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
		......
	})
	......

	// 并为每个 ScorePlugin 应用评分默认权重
	parallelize.Until(ctx, len(f.scorePlugins), func(index int) {
		pl := f.scorePlugins[index]
		weight := f.pluginNameToWeightMap[pl.Name()]
		nodeScoreList := pluginToNodeScores[pl.Name()]

		for i, nodeScore := range nodeScoreList {
			......
      // 为节点的分数作用上定义的权重
			nodeScoreList[i].Score = nodeScore.Score * int64(weight)
		}
	})
	......

	return pluginToNodeScores, nil
}

RunPreScorePlugins 函数就是循环调用所有注册的 PreScore 插件的 PreScore 函数,这里重点是 RunScorePlugins 函数的实现,在该函数中首先会对每个节点并行运行注册插件的 Score 方法,然后会为为每个插件并行运行 NormalizeScore 方法,由于每个插件的权重不一样,所以最后还有一步非常重要是为每个插件作用上定义的权重得到最终的分数。

最后通过调用 selectHost 函数来获得一个得分最高的节点:

// pkg/scheduler/core/generic_scheduler.go

// selectHost 所有节点的优先级列表,然后选择一个分数最高的节点
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
	if len(nodeScoreList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
  // 将第一个节点作为选择的节点
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
  // 然后循环后面的节点进行比较
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {  
      // 如果当前节点分数更大,则选择该节点
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
      // 如果分数相同,cntOfMaxScore+1
			cntOfMaxScore++ 
			if rand.Intn(cntOfMaxScore) == 0 {
				// 以 1/cntOfMaxScore 的概率取代候选节点
        // 因为分数都一样,就无所谓选择哪个节点了
				selected = ns.Name
			}
		}
	}
	return selected, nil
}
主要用于处理Pod在Filter阶段失败后的操作,如抢占、Autoscale触发等。
DefaultPreemption:当高优先级的Pod没有找到合适的Node时,会执行Preempt抢占算法,抢占的流程:
①一个Pod进入抢占的时候,首先会判断Pod是否拥有抢占的资格,有可能上次已经抢占过一次。
②如果符合抢占资格,会先对所有的节点进行一次过滤,过滤出符合这次抢占要求的节点。然后
③模拟一次调度,把优先级低的Pod先移除出去,再尝试能否把待抢占的Pod放置到此节点上。然后通过这个过程从过滤剩下的节点中选出一批节点进行抢占。
④ProcessPreemptionWithExtenders是一个扩展的钩子,用户可以在这里加一些自己抢占节点的策略。如果没有扩展的钩子,这里面不做任何动作。
⑤PickOneNodeForPreemption,从上面选出的节点里挑选出最合适的一个节点,策略包括:
        优先选择打破PDB最少的节点;
        其次选择待抢占Pods中最大优先级最小的节点;
        再次选择待抢占Pods优先级加和最小的节点;
        接下来选择待抢占Pods数目最小的节点;
        最后选择拥有最晚启动Pod的节点;
通过过滤之后,会选出一个最合适的节点。对这个节点上待抢占的Pod进行delete,完成抢占过程。
 
优选阶段会运行PreScore+Score所有插件进行打分

未经允许不得转载:江哥架构师笔记 » kube-scheduler:调度插件执行原理

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址