路漫漫其修远兮
吾将上下而求索

kube-scheduler :调度 Pod 流程

前面我们分析了 kube-scheduler 组件如何接收命令行参数,用传递的参数构造一个 Scheduler 对象,最终启动了调度器。调度器启动后就可以开始为未调度的 Pod 进行调度操作了,本文主要来分析调度器是如何对一个 Pod 进行调度操作的。

调度队列

调度器启动后最终是调用 Scheduler 下面的 Run() 函数来开始调度 Pod,如下所示代码:

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

首先会等待所有的 cache 同步完成,然后开始执行 SchedulingQueue 的 Run() 函数,SchedulingQueue 是一个队列接口,用于存储待调度的 Pod,该接口遵循类似于 cache.FIFOcache.Heap 这样的数据结构,要弄明白调度器是如何去调度 Pod 的,我们就首先需要弄清楚这个结构:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 用于存储带调度 Pod 的队列接口
type SchedulingQueue interface {
	framework.PodNominator
	// AddUnschedulableIfNotPresent 将无法调度的 Pod 添加回调度队列
  // podSchedulingCycle表示可以通过调用 SchedulingCycle() 返回的当前调度周期号
	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error
  // SchedulingCycle 返回由调度队列缓存的当前调度周期数。 
  // 通常,只要弹出一个 Pod(例如调用 Pop() 函数),就增加此数字。
	SchedulingCycle() int64
  
  // 下面是通用队列相关操作
  // Pop 删除队列的头并返回它。 
  // 如果队列为空,它将阻塞,并等待直到新元素添加到队列中
	Pop() (*framework.QueuedPodInfo, error)
  // 往队列中添加一个 Pod
	Add(pod *v1.Pod) error
	Update(oldPod, newPod *v1.Pod) error
	Delete(pod *v1.Pod) error

	MoveAllToActiveOrBackoffQueue(event string)
	AssignedPodAdded(pod *v1.Pod)
	AssignedPodUpdated(pod *v1.Pod)
	PendingPods() []*v1.Pod
  // 关闭 SchedulingQueue,以便等待 pop 元素的 goroutine 可以正常退出
	Close()
	// NumUnschedulablePods 返回 SchedulingQueue 中存在的不可调度 Pod 的数量
	NumUnschedulablePods() int
	// 启动管理队列的goroutine
	Run()
}

SchedulingQueue 是一个用于存储带调度 Pod 的队列接口,在构造 Scheduler 对象的时候我们可以了解到调度器中是如何实现这个队列接口的:

// pkg/scheduler/factory.go

// Profiles are required to have equivalent queue sort plugins.
lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()
podQueue := internalqueue.NewSchedulingQueue(
	lessFn,
	internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
	internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
	internalqueue.WithPodNominator(nominator),
)
......
return &Scheduler{
	......
	NextPod:         internalqueue.MakeNextPodFunc(podQueue),
  ......
	SchedulingQueue: podQueue,
}, nil

可以看到上面的 internalqueue.NewSchedulingQueue 就是创建的一个 SchedulingQueue 对象,定义如下所示:

// pkg/scheduler/internal/queue/scheduling_queue.go

// 初始化一个优先级队列作为一个新的调度队列
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
	return NewPriorityQueue(lessFn, opts...)
}

// 配置 PriorityQueue
type Option func(*priorityQueueOptions)

// 创建一个 PriorityQueue 对象
func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
  ......

  comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}
  ......

  pq := &PriorityQueue{
		PodNominator:              options.podNominator,
		clock:                     options.clock,
		stop:                      make(chan struct{}),
		podInitialBackoffDuration: options.podInitialBackoffDuration,
		podMaxBackoffDuration:     options.podMaxBackoffDuration,
		activeQ:                   heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
		unschedulableQ:            newUnschedulablePodsMap(metrics.NewUnschedulablePodsRecorder()),
		moveRequestCycle:          -1,
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())

	return pq
}

从上面的初始化过程可以看出来 PriorityQueue 这个优先级队列实现了 SchedulingQueue 接口,所以真正的实现还需要去查看这个优先级队列:

// pkg/scheduler/internal/queue/scheduling_queue.go

// PriorityQueue 实现了调度队列 SchedulingQueue
// PriorityQueue 的头部元素是优先级最高的 pending Pod,该结构有三个子队列:
// 一个子队列包含正在考虑进行调度的 Pod,称为 activeQ,是一个堆
// 另一个队列包含已尝试并且确定为不可调度的 Pod,称为 unschedulableQ
// 第三个队列包含从 unschedulableQ 队列移出的 Pod,并在 backoff 完成后将其移到 activeQ 队列
type PriorityQueue struct {
	framework.PodNominator

	stop  chan struct{}
	clock util.Clock

	// pod 初始 backoff 的时间
	podInitialBackoffDuration time.Duration
	// pod 最大 backoff 的时间
	podMaxBackoffDuration time.Duration

	lock sync.RWMutex
	cond sync.Cond  // condition

  // activeQ 是调度程序主动查看以查找要调度 pod 的堆结构,堆头部是优先级最高的 Pod
	activeQ *heap.Heap
  // backoff 队列
	podBackoffQ *heap.Heap
	// unschedulableQ 不可调度队列
	unschedulableQ *UnschedulablePodsMap
  // 调度周期的递增序号,当 pop 的时候会增加
	schedulingCycle int64
  // moveRequestCycle 会缓存 schedulingCycle 的值
  // 当未调度的 Pod 重新被添加到 activeQ 中会保存 schedulingCycle 到 moveRequestCycle 中
	moveRequestCycle int64

	// 表明队列已经被关闭
	closed bool
}

这里使用的是一个 PriorityQueue 优先级队列来存储带调度的 Pod,这个也很好理解,普通队列是一个 FIFO 数据结构,根据元素进入队列的顺序依次出队,而对于调度的这个场景,优先级队列显然更合适,可以根据某些优先级策略,优先对某个 Pod 进行调度。

PriorityQueue 的头部元素是优先级最高的带调度的 Pod,该结构有三个子队列:

  • 活动队列(activeQ):用来存放等待调度的 Pod 队列。

  • 不可调度队列(unschedulableQ):当 Pod 不能满足被调度的条件的时候就会被加入到这个不可调度的队列中来,等待后续继续进行尝试调度。

  • 回退队列(podBackOffQ):如果任务反复执行还是失败,则会按尝试次数增加等待调度时间,降低重试效率,从而避免反复失败浪费调度资源。对于调度失败的 Pod 会优先存储在 backoff 队列中,等待后续进行重试,可以认为就是重试的队列,只是后续再调度的等待时间会越来越长。

这里我们需要来弄清楚这几个队列是如何实现的。

活动队列

**活动队列(activeQ)是存储当前系统中所有在等待调度的 Pod 队列,**在上面实例化优先级队列里面可以看到 activeQ 队列的初始化是通过调用 heap.NewWithRecorder() 函数实现的。

// pkg/scheduler/internal/heap/heap.go

// NewWithRecorder 就是 Heap 基础上包装了 metrics 数据
func NewWithRecorder(keyFn KeyFunc, lessFn lessFunc, metricRecorder metrics.MetricRecorder) *Heap {
	return &Heap{
		data: &data{
			items:    map[string]*heapItem{},
			queue:    []string{},
			keyFunc:  keyFn,
			lessFunc: lessFn,
		},
		metricRecorder: metricRecorder,
	}
}

// lessFunc 接收两个元素,对列表进行排序时,将第一个元素放在第二个元素之前,则返回true。
type lessFunc = func(item1, item2 interface{}) bool

其中的 data 数据结构是 Golang 中的一个标准 heap 堆(只需要实现 heap.Interface 接口即可),然后 Heap 是在 data 基础上新增了一个用于记录 metrics 数据的堆,这里最重要的就是用比较元素优先级的 lessFunc 函数的实现,在初始化优先级队列的时候我们传入了一个 comp 的参数,这个参数就是 activeQ 这个堆里面的 lessFunc 函数的实现:

comp := func(podInfo1, podInfo2 interface{}) bool {
		pInfo1 := podInfo1.(*framework.QueuedPodInfo)
		pInfo2 := podInfo2.(*framework.QueuedPodInfo)
		return lessFn(pInfo1, pInfo2)
	}

最终是调用的创建 Scheduler 对象的时候传入的 lessFn 参数:

lessFn := profiles[c.profiles[0].SchedulerName].Framework.QueueSortFunc()

从这里可以看到比较元素优先级是通过调度框架的 QueueSortFunc() 函数来实现的,对应的实现如下所示:

// pkg/scheduler/framework/runtime/framework.go

// QueueSortFunc 返回用于对调度队列中的 Pod 进行排序的函数
func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
	if f == nil {
		// 如果 frameworkImpl 为nil,则只需保持其顺序不变
		// NOTE: 主要用于测试
		return func(_, _ *framework.QueuedPodInfo) bool { return false }
	}
  // 如果没有 queuesort 插件
	if len(f.queueSortPlugins) == 0 {
		panic("No QueueSort plugin is registered in the frameworkImpl.")
	}

	// 只有一个 QueueSort 插件有效
	return f.queueSortPlugins[0].Less
}

最终真正用于优先级队列元素优先级比较的函数是通过 QueueSort 插件来实现的,默认启用的 QueueSort 插件是 PrioritySort:

// pkg/scheduler/algorithmprovider/registry.go

func getDefaultConfig() *schedulerapi.Plugins {
	return &schedulerapi.Plugins{
		QueueSort: &schedulerapi.PluginSet{
			Enabled: []schedulerapi.Plugin{
				{Name: queuesort.Name},
			},
		},
    ......

PrioritySort 这个插件的核心实现就是其 Less 函数了:

// pkg/scheduler/framework/plugins/queuesort/priority_sort.go

// Less 是 activeQ 队列用于对 Pod 进行排序的函数。
// 它根据 Pod 的优先级对 Pod 进行排序,
// 当优先级相同时,它使用 PodQueueInfo.timestamp 进行比较
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {
	p1 := pod.GetPodPriority(pInfo1.Pod)
	p2 := pod.GetPodPriority(pInfo2.Pod)
  // 先根据优先级的高低进行比较,然后根据 Pod 的创建时间
  // 越高优先级的 Pod 越被优先调度,越早创建的pod越优先
	return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

// pkg/api/v1/pod/util.go

// GetPodPriority 获取指定 Pod 的优先级
func GetPodPriority(pod *v1.Pod) int32 {
	if pod.Spec.Priority != nil {
		return *pod.Spec.Priority
	}
	return 0
}

到这里就真相大白了,对于 activeQ 活动队列中的 Pod 是依靠 PrioritySort 插件来进行优先级比较的,每个 Pod 在被创建后都会有一个 priority 属性来标记 Pod 优先级,我们也可以通过全局的 ProrityClass 对象来进行定义,然后在调度 Pod 的时候会先根据 Pod 优先级的高低进行比较,如果优先级相同,则回根据 Pod 的创建时间进行比较,越高优先级的 Pod 越被优先调度,越早创建的Pod 越优先。

那么 Pod 是在什么时候加入到 activeQ 活动队列的呢?还记得前面我们在创建 Scheduler 对象的时候有一个 addAllEventHandlers 函数吗?其中就有对未调度 Pod 的事件监听处理操作。

// pkg/scheduler/eventhandlers.go

// unscheduled pod queue
podInformer.Informer().AddEventHandler(
	cache.FilteringResourceEventHandler{
		FilterFunc: func(obj interface{}) bool {
			switch t := obj.(type) {
			case *v1.Pod:
				return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
			case cache.DeletedFinalStateUnknown:
				if pod, ok := t.Obj.(*v1.Pod); ok {
					return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
				}
				utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
				return false
			default:
				utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
				return false
			}
		},
		Handler: cache.ResourceEventHandlerFuncs{
			AddFunc:    sched.addPodToSchedulingQueue,
			UpdateFunc: sched.updatePodInSchedulingQueue,
			DeleteFunc: sched.deletePodFromSchedulingQueue,
		},
	},
)

当 Pod 有事件变化后,首先回通过 FilterFunc 函数进行过滤,如果 Pod 没有绑定到节点(未调度)并且使用的是指定的调度器才进入下面的 Handler 进行处理,比如当创建 Pod 以后就会有 onAdd 的添加事件,这里调用的就是 sched.addPodToSchedulingQueue 函数:

// pkg/scheduler/eventhandlers.go

// 添加未调度的 Pod 到优先级队列
func(sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
   pod := obj.(*v1.Pod)
   klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
   if err := sched.SchedulingQueue.Add(pod); err != nil {
      utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
   }
}

可以看到这里当 Pod 被创建后,会将 Pod 通过调度队列 SchedulingQueue 的 Add 函数添加到优先级队列中去:

// pkg/scheduler/internal/queue/scheduling_queue.go

// Add 添加 Pod 到 activeQ 活动队列,仅当添加了新的 Pod 时才应调用它
// 这样 Pod 就不会已经处于 active/unschedulable/backoff 队列中了
func (p *PriorityQueue) Add(pod *v1.Pod) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pInfo := p.newQueuedPodInfo(pod)
  // 添加到 activeQ 队列
	if err := p.activeQ.Add(pInfo); err != nil {
		klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err)
		return err
	}
  // 如果在 unschedulableQ 队列中,则从改队列移除
	if p.unschedulableQ.get(pod) != nil {
		klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod))
		p.unschedulableQ.delete(pod)
	}
	// 从 backoff 队列删除
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
		klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod))
	}
  // 记录metrics
	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()
	p.PodNominator.AddNominatedPod(pod, "")
  // 通知其他地方进行处理
	p.cond.Broadcast()

	return nil
}

这就是 activeQ 活动队列添加元素的过程。

调度 Pod

当我们把新创建的 Pod 添加到 activeQ 活动队列过后,就可以在另外的协程中从这个队列中弹出堆顶的元素来进行具体的调度处理了。这里就要回头本文开头部分调度器启动后执行的一个调度操作了 sched.scheduleOne ,对单个 Pod 进行调度的基本流程如下所示:

  1. 通过优先级队列弹出需要调度的 Pod

  2. 在某些场景下跳过调度

  3. 执行 Schedule 调度函数进行真正的调度,找到 Pod 合适的节点

  4. 如果上面调度失败则会尝试抢占机制

  5. 如果调度成功则将该 Pod 和选定的节点进行假性绑定(临时绑定),存入到调度器 cache 中,而不需要等待绑定操作的发生,方便后续操作

  6. 异步执行真正的绑定操作,将节点名称添加到 Pod 的 spec.nodeName 属性中进行绑定。

scheduleOne 函数如下所示:

// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 从调度器中获取下一个要调度的 Pod
	podInfo := sched.NextPod()
	......
}

scheduleOne 函数在最开始调用 sched.NextPod() 函数来获取现在需要调度的 Pod,其实就是上面 activeQ 活动队列中 Pop 出来的元素,当实例化 Scheduler 对象的时候就指定了 NextPod 函数:internalqueue.MakeNextPodFunc(podQueue)

// pkg/scheduler/internal/queue/scheduling_queue.go

// MakeNextPodFunc 返回一个函数,用于从指定的调度队列中获取下一个 Pod 进行调度
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
	return func() *framework.QueuedPodInfo {
		podInfo, err := queue.Pop()
		......
		return nil
	}
}

很明显这里就是调用的优先级队列的 Pop() 函数来弹出队列中的 Pod 进行调度处理。

// pkg/scheduler/internal/queue/scheduling_queue.go

// Pop 删除 activeQ 活动队列的头部元素并返回它。
// 如果 activeQ 为空,它将阻塞,并等待直到新元素添加到队列中。
// 当 Pod 弹出后会增加调度周期参数的值。
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
    // 当队列为空时,将阻塞Pop()的调用,直到新元素入队。
    // 调用Close()时,将设置p.closed并广播condition,这将导致此循环继续并从Pop()返回。
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		p.cond.Wait()
	}
  // 从 activeQ 队列弹出堆顶元素
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	pInfo.Attempts++
  // 增加调度周期次数
	p.schedulingCycle++
	return pInfo, err
}

Pop() 函数很简单,就是从 activeQ 队列中弹出堆顶的元素返回即可。拿到了要调度的 Pod,接下来就是去真正执行调度逻辑了。

接着通过 Pod 指定的调度器获取对应的 profile 用于后续处理,其中包含有当前调度器对应的调度框架对象:

// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 从调度器中获取下一个要调度的 Pod
	podInfo := sched.NextPod()
	if podInfo == nil || podInfo.Pod == nil {
		return
	}
	pod := podInfo.Pod
  // 根据Pod对象里面的schedulename字段获取指定的profile,从profiles的map里找对应的plugins集合
  //  重要:如果是默认的schedule进程,使用default-schedule,就使用default的调度策略,哪些enable,哪些disable那个
  //  如果这里使用的是自定义调度名字sample,则使用自定义调度器里configmap里的哪些enable,哪些disable那个策略
  //  自定义调度器是在default调度器原有的插件策略基础之上,加入了自己的自定义策略
  //  具体看(启动流程)文章最后的图
	prof, err := sched.profileForPod(pod)
	if err != nil {
		klog.Error(err)
		return
	}
  // 某些特定情况下跳过调度 Pod
	if sched.skipPodSchedule(prof, pod) {
		return
	}
  ......
}

// 某些特定情况下跳过调度 Pod 则返回 true
func (sched *Scheduler) skipPodSchedule(prof *profile.Profile, pod *v1.Pod) bool {
	// Case 1: Pod 标记为删除
	if pod.DeletionTimestamp != nil {
		prof.Recorder.Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		return true
	}

	// Case 2: Pod 已经临时绑定可能会被跳过 Pod 更新
  // 如果临时绑定的 Pod 在其之前的调度周期中,但在被临时绑定之前有更新事件,则可以将其再次添加到调度队列中。
	if sched.skipPodUpdate(pod) {
		return true
	}
	return false
}

在真正开始调度之前,在某些场景下可能需要跳过 Pod 的调度,比如 Pod 被标记为删除了,还有一些忽略的场景通过 skipPodUpdate 函数来确定:

// pkg/scheduler/eventhandlers.go

// skipPodUpdate 检查指定的 Pod 更新是否应该被忽略
// 该函数将返回 true,如果:
//   - Pod 已经被暂时绑定, 并且
//   - Pod 只有 ResourceVersion,Spec.NodeName,Annotations,
//     ManagedFields, Finalizers 和/或 Conditions 更新了.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
	// 没有暂时绑定的 Pod 不能被跳过更新
	isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
		return false
	}
	if !isAssumed {
		return false
	}

	// 从 cache 中获取临时绑定的 Pod
	assumedPod, err := sched.SchedulerCache.GetPod(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
		return false
	}
	
  // 比较临时绑定的 Pod 和更新的 Pod
  // 如果它们相等(排除某些字段),则将跳过此 pod 更新
	f := func(pod *v1.Pod) *v1.Pod {
		p := pod.DeepCopy()
    // 必须排除 ResourceVersion 字段,因为每个对象更新将具有新的资源版本
		p.ResourceVersion = ""
    // Spec.NodeName 必须排除在外,因为在缓存中预期就临时为 Pod 分配了节点
		p.Spec.NodeName = ""
		// Annotations 必须排除在外,因为 <https://github.com/kubernetes/kubernetes/issues/52914>.
		p.Annotations = nil
		p.ManagedFields = nil
		// 外部控制器可能会更改以下内容,但它们不会影响调度决策
		p.Finalizers = nil
		p.Status.Conditions = nil
		return p
	}
  // 排除上面的这些字段外,如果一致则可以跳过更新
	assumedPodCopy, podCopy := f(assumedPod), f(pod)
	if !reflect.DeepEqual(assumedPodCopy, podCopy) {
		return false
	}
	klog.V(3).Infof("Skipping pod %s/%s update", pod.Namespace, pod.Name)
	return true
}

经过上面的忽略 Pod 调度过程后,就会去同步尝试调度 Pod 找到适合的节点。整个调度过程和之前的调度框架流程图是一致的:

https://bxdc-static.oss-cn-beijing.aliyuncs.com/images/20210318115428.png

代码如下所示:

// pkg/scheduler/scheduler.go

// scheduleOne 为单个 Pod 完成整个调度工作流程
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  ......
  // 同步尝试调度 Pod 找到适合的节点
	start := time.Now()
	
	// 这里的state是为了存放一些自定义变量,给下一个阶段需要上一个阶段的结果传递变量
	state := framework.NewCycleState()
	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
	schedulingCycleCtx, cancel := context.WithCancel(ctx)
	defer cancel()
  // 执行 Schedule 调度函数进行真正的调度
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
	if err != nil {
    // Schedule() 函数执行可能会失败,因为 Pod 无法在任何主机上运行
    // 因此我们尝试进行抢占,并期望下一次尝试Pod进行调度时,因为抢占而可以正常调度
    // 但也有可能不同的 Pod 会调度到被抢占的资源中,但这没有什么影响
		nominatedNode := ""
		if fitError, ok := err.(*core.FitError); ok {
			if !prof.HasPostFilterPlugins() {
        // 没有注册 PostFilter 插件
				klog.V(3).Infof("No PostFilter plugins are registered, so no preemption will be performed.")
			} else {
				// 运行 PostFilter 插件,尝试在未来的调度周期中使 Pod 可调度
        result, status := prof.RunPostFilterPlugins(ctx, state, pod, fitError.FilteredNodesStatuses)
				......
				if status.IsSuccess() && result != nil {
					nominatedNode = result.NominatedNodeName  // 被提名的节点名
				}
			}
		}
    // recordSchedulingFailure 为 pod记录一个事件,表示 pod 调度失败
    // 另外,如果设置了pod condition 和 nominated 提名节点名称,也要更新
    // 这里最重要的是要将调度失败的 Pod 加入到不可调度 Pod 的队列中去
    sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
	}
	
  // 告诉 cache 暂时绑定的 Pod 现在正在指定节点上运行,即使尚未绑定
	// 这样我们就可以保持调度,而不必等待真正的绑定发生
	assumedPodInfo := podInfo.DeepCopy()  // 拷贝现在调度的 Pod 信息
	assumedPod := assumedPodInfo.Pod
  // assume 是通过设置 NodeName=scheduleResult.SuggestedHost 来修改 `assumedPod` 的
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
	if err != nil {
		......
	}
	
	// 运行 reserve 插件的 Reserve 方法(预留)
	if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
		// 触发 un-reserve 插件以清理与 reserve 的 Pod 相关联的状态
    prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    // 从缓存中移除临时绑定的 Pod
		if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
		}
		......
	}
  
  // 运行 "permit" 插件
	runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
    // 当 permit 插件结果不是 Wait 并且没有执行成功的时候,进行错误处理
		var reason string
		if runPermitStatus.IsUnschedulable() {
			reason = v1.PodReasonUnschedulable
		} else {
			reason = SchedulerError
		}
    // 其中一个插件返回的状态不等于 success 或 wait 的状态
    // 触发 un-reserve 插件
		prof.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		// 从缓存中移除临时绑定的 Pod
    if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
			klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
		}
    // 记录调度失败事件,主要我们关心的是如何加入到另外两个不可调度队列中去
    sched.recordSchedulingFailure(prof, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)
		return
	}
	......
}

从上面函数可以看出就是通过 sched.Algorithm.Schedule() 函数执行真正的调度选择合适的节点,如果调度出现了错误,比如没有得到任何合适的节点,这个时候如果注册了 PostFilter 插件,则会执行该插件,抢占操作就是在该插件中执行的,然后会将这个调度失败的 Pod 加入到 unschedulableQ 或者 podBackoffQ 队列中去,后续我们再仔细分析。

如果调度成功,得到了合适的节点,则先将该 Pod 和选定的节点进行假性绑定(不用等待去执行真正的绑定操作),存入调度器的 cache 中去,然后执行 Reserve 预留插件,去预留节点上 Pod 需要的资源。

接着就是调度的最后阶段去执行 Permit 允许插件,用于阻止或者延迟 Pod 与节点的绑定。

上面调度的过程都完成过后,最后就是去异步和节点进行真正的绑定操作了。

// pkg/scheduler/scheduler.go

func (sched *Scheduler) scheduleOne(ctx context.Context) {
	......
	// 异步绑定 Pod 到选定的节点 (we can do this b/c of the assumption step above).
	go func() {
		......
		// 首先调用 WaitOnPermit 扩展,与前面的 Permit 扩展点配合使用实现延时调度功能
		waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
		if !waitOnPermitStatus.IsSuccess() {
			......
			// 触发 un-reserve 插件清理 reserver pod 关联的状态
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
      // 从缓存中移除临时绑定的 Pod
			if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
			}
			return
		}

		// 执行 "prebind" 插件.
		preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		if !preBindStatus.IsSuccess() {
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
			}
			sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), SchedulerError, "")
			return
		}
		// 调用 bind 函数进行真正的绑定
		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
		if err != nil {
			// 绑定失败进行清理操作
			prof.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
			if err := sched.SchedulerCache.ForgetPod(assumedPod); err != nil {
				klog.Errorf("scheduler cache ForgetPod failed: %v", err)
			}
		} else {
			// 最后绑定成功后,调用 "postbind" 插件.
			prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}
	}()
}

执行真正的绑定操作是在一个单独的 goroutine 里面操作的,由于前面调度最后可能启用了 Permit 允许插件,所以这里首先需要调用内置的 WaitOnPermit 插件配合 Permit 插件,如果 Pod 是 waiting 状态,则 WaitOnPermit 将会一直阻塞,直到 approve 或 deny 该 Pod。

然后就和调度框架里面的流程一样,依次调用 prebind、bind、postbind 插件完成真正的绑定操作。到这里我们就真正的完成了一个 Pod 的调度流程。

失败与重试处理

podBackOffQ

平时我们在创建 Pod 后如果执行失败了,可能会有一些 backoff 这样的 events 信息,这个 backoff 是什么意思呢?

backoff(回退)机制是并发编程中非常常见的一种机制,即如果任务反复执行依旧失败,则会按次增加等待调度时间,降低重试的效率,从而避免反复失败浪费调度资源。

针对调度失败的 Pod 会优先存储在 backoff 队列中,等待后续重试。podBackOffQ 主要存储那些在多个 schedulingCyle 中依旧调度失败的情况,则会通过 backoff 机制,延迟等待调度时间。

backoffQ 也是一个优先级队列,其初始化也是在 Scheduler 初始化优先级队列的时候传入的,其中最重要的比较队列中元素优先级的函数为 pq.podsCompareBackoffCompleted

// pkg/scheduler/internal/queue/scheduling_queue.go

func NewPriorityQueue(
	lessFn framework.LessFunc,
	opts ...Option,
) *PriorityQueue {
	......
	pq := &PriorityQueue{
		......
	}
	pq.cond.L = &pq.lock
	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
	return pq
}

// 比较 BackOffQ 队列中元素的优先级,谁的回退时间短谁的优先级高
func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.QueuedPodInfo)
	pInfo2 := podInfo2.(*framework.QueuedPodInfo)
	bo1 := p.getBackoffTime(pInfo1)
	bo2 := p.getBackoffTime(pInfo2)
	return bo1.Before(bo2)
}

// getBackoffTime 获取 podInfo 完成回退的时间
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

// calculateBackoffDuration 是一个 helper 函数用于计算 backoffDuration
// 基于 pod 已经计算的 attempts 次数
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
	duration := p.podInitialBackoffDuration  // 默认1s
	for i := 1; i < podInfo.Attempts; i++ {  // 调度成功之前的尝试次数
		duration = duration * 2
		if duration > p.podMaxBackoffDuration {  // 最大10s
			return p.podMaxBackoffDuration
		}
	}
	return duration
}

不可调度队列

unschedulableQ 就是字面意思不可调度队列,该队列中的 Pod 是已经尝试被确定为不可调度的 Pod,虽说是个队列,实际的数据结构是一个 map 类型。

// pkg/scheduler/internal/queue/scheduling_queue.go

// UnschedulablePodsMap 保存了不可调度的 Pod。
// 这个数据结构用于实现 unschedulableQ
type UnschedulablePodsMap struct {
	// podInfoMap 是由 Pod 的全名(podname_namespace)构成的 map key,值是指向 QueuedPodInfo 的指针。
  podInfoMap map[string]*framework.QueuedPodInfo
	keyFunc    func(*v1.Pod) string
	metricRecorder metrics.MetricRecorder
}

错误处理

在上面 scheduleOne 函数中当我们去真正执行调度操作后,如果出现了错误,然后会调用 recordSchedulingFailure 函数记录调度失败的事件,该函数中我们最关心的是调用 Scheduler 的 Error 回调函数,这个回调函数是在初始化调度器的时候传入的,是通过 MakeDefaultErrorFunc 函数得到的一个回调函数,在这个函数中会把当前调度失败的 Pod 加入到 unschedulableQ 不可调度队列或者 podBackoffQ 队列中去:

// pkg/scheduler/scheduler.go

// 记录调度失败的事件
func (sched *Scheduler) recordSchedulingFailure(prof *profile.Profile, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {
	sched.Error(podInfo, err)
	......
}

// pkg/scheduler/factory.go

// 创建 Scheduler 对象
func (c *Configurator) create() (*Scheduler, error) {
	......
	return &Scheduler{
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
		......
	}, nil
}

// MakeDefaultErrorFunc 构造一个函数用来处理 Pod 调度错误
func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) {
	return func(podInfo *framework.QueuedPodInfo, err error) {
		pod := podInfo.Pod
		......
		// 从 informer 缓存中获取 Pod
		cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name)
		if err != nil {
			klog.Warningf("Pod %v/%v doesn't exist in informer cache: %v", pod.Namespace, pod.Name, err)
			return
		}
		// 添加到 unschedulableQ 队列中去
		podInfo.Pod = cachedPod.DeepCopy()
		if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil {
			klog.Error(err)
		}
	}
}

真正是加入到队列是通过调用函数 podQueue.AddUnschedulableIfNotPresent() 来完成的:

// pkg/scheduler/internal/queue/scheduling_queue.go

// AddUnschedulableIfNotPresent 将一个不可调用的 Pod 插入到队列中
// 如果已经在队列中了,则忽略
// 一般情况优先级队列会把不可调度的 Pod 加入到 `unschedulableQ` 队列中
// 但如果最近有 move request,则会将 Pod 加入到 `podBackoffQ` 队列中
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {
	p.lock.Lock()
	defer p.lock.Unlock()
	pod := pInfo.Pod
  // 查看是否已经在 unschedulableQ 队列中了
	if p.unschedulableQ.get(pod) != nil {
		return fmt.Errorf("pod: %v is already present in unschedulable queue", nsNameForPod(pod))
	}

	// 刷新 Pod 被重新添加后的时间戳
	pInfo.Timestamp = p.clock.Now()
  // 检查是否在 activeQ 活动队列中
	if _, exists, _ := p.activeQ.Get(pInfo); exists {
		return fmt.Errorf("pod: %v is already present in the active queue", nsNameForPod(pod))
	}
  // 检查是否在 podBackoffQ 队列中
	if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {
		return fmt.Errorf("pod %v is already present in the backoff queue", nsNameForPod(pod))
	}

  // 如果已经收到了 move request,则将其加入到 BackoffQ 队列中,否则加入到 unschedulableQ 不可调度队列
	if p.moveRequestCycle >= podSchedulingCycle {
		if err := p.podBackoffQ.Add(pInfo); err != nil {
			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err)
		}
		metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()
	} else {
		p.unschedulableQ.addOrUpdate(pInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()
	}

	p.PodNominator.AddNominatedPod(pod, "")
	return nil
}

在 Pod 调度失败时,会调用AddUnschedulableIfNotPresent函数,其中有一个逻辑:

  1. 如果 moveRequestCycle 大于等于当前的 podSchedulingCycle,则当前应该对之前已经失败的 Pod 进行重试,也就是放进 backoffQ 队列里

  2. 如果不满足,则放进 unscheduleableQ 不可调度队列里

对于 moveRequestCycle 这个属性只有集群资源发生过变更(在资源的事件监听处理器里面都会去设置 moveRequestCycle=podSchedulingCycle)才会等于podSchedulingCycle。理论上来说在 Pod 调度失败时,没有后续任何操作,会被放进 unscheduleableQ 不可调度队列,但是有可能 Pod 刚刚调度失败,在错误处理之前,忽然发生了资源变更,这个时候,由于在这个错误处理的间隙,集群的资源状态已经发生了变化,所以可以认为这个 Pod 有了被调度成功的可能性,所以就被放进了backoffQ重试队列中,等待快速重试。

那么 PriorityQueue 队列里面包含的3个子队列之间的数据是如何流转的呢?还是要从调度启动的函数入手分析:

// pkg/scheduler/scheduler.go

// 等待 cache 同步完成,然后开始调度
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

其中的 sched.SchedulingQueue.Run() 函数就是运行 PriorityQueue 队列的 Run() 函数:

// pkg/scheduler/internal/queue/scheduling_queue.go

// Run 启动协程,把 podBackoffQ 队列数据放到 activeQ 活动队列中
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

// flushBackoffQCompleted 将 backoffQ 队列中已完成 backoff Pod 移动到 activeQ 队列中
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	for {
    // 获取heap头元素(不删除)
		rawPodInfo := p.podBackoffQ.Peek() 
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
    // 如果该 Pod 回退完成的时间还没到,则忽略
		boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
		if boTime.After(p.clock.Now()) {
			return
		}
    // 完成了则弹出heap头元素
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
			return
		}
    // 加入到 activeQ 活动队列
		p.activeQ.Add(rawPodInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
    // 广播
		defer p.cond.Broadcast()
	}
}

// flushUnschedulableQLeftover 将停留在 unschedulableQ 中时间长于 1min 的 Pod 移动 activeQ 中
func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.QueuedPodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
    // 最后调度的时间
		lastScheduleTime := pInfo.Timestamp
    // 如果 Pod 在 unschedulableQ 队列中停留的时间超过1min
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
    // 移动到活动队列或者 Backoff 队列
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

// 移动Pod到活动队列或者Backoff队列
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
	for _, pInfo := range podInfoList {
		pod := pInfo.Pod
    // 如果还在 backoff 时间内
		if p.isPodBackingoff(pInfo) {
      // 添加到 podBackOffQ 队列
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the backoff queue: %v", pod.Name, err)
			} else {
        // 从 unschedulableQ 队列删除
				metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		} else {
      // 添加到 activeQ 队列
			if err := p.activeQ.Add(pInfo); err != nil {
				klog.Errorf("Error adding pod %v to the scheduling queue: %v", pod.Name, err)
			} else {
        // 从 unschedulableQ 队列删除
				metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event).Inc()
				p.unschedulableQ.delete(pod)
			}
		}
	}
  // 将 moveRequestCycle 设置为当前的 schedulingCycle
	p.moveRequestCycle = p.schedulingCycle
	p.cond.Broadcast()
}

加上上面的 sched.scheduleOne 函数,3个子队列整体的工作流程就是:

  1. 每隔1秒,检测 backoffQ 里是否有 Pod 可以被放进 activeQ 里

  2. 每隔30秒,检测 unscheduleodQ 里是否有 Pod 可以被放进 activeQ 里(默认条件是等待时间超过60秒)

  3. 不停的调用 scheduleOne 方法,从 activeQ 里弹出 Pod 进行调度

如果一个 Pod 调度失败了,正常就是不可调度的,应该放入 unscheduleableQ 队列。如果集群内的资源状态一直不发生变化,这种情况,每隔60s这些 Pod 还是会被重新尝试调度一次。

但是一旦资源的状态发生了变化,这些不可调度的 Pod 就很可能可以被调度了,也就是 unscheduleableQ 中的 Pod 应该放进 backoffQ 里面去了。等待安排重新调度,backoffQ 里的 Pod 会根据重试的次数设定等待重试的时间,重试的次数越少,等待重新调度的时间也就越少。backOffQ 里的 Pod 调度的速度会比 unscheduleableQ 里的 Pod 快得多。

https://bxdc-static.oss-cn-beijing.aliyuncs.com/images/20210321162812.png

到这里我们就完成了调度单个 Pod 的完整工作流的分析,接下来就是真正调度过程中的核心调度算法的实现过程。

未经允许不得转载:江哥架构师笔记 » kube-scheduler :调度 Pod 流程

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址