路漫漫其修远兮
吾将上下而求索

控制器:编写控制器

本文主要介绍如何使用 WorkQueue 来编写控制器

介绍

Kubernetes 控制器是一个主动调谐的过程,它会 watch 一些对象的期望状态,也会 watch 实际的状态,然后,控制器发送一些指令尝试让对象的当前状态往期望状态迁移。

控制器最简单的实现就是一个循环:

for {
  desired := getDesiredState()
  current := getCurrentState()
  makeChanges(desired, current)
}

Watches 这些都只是这个逻辑的优化。

https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md

指南

当你在编写控制器时,有一些准则将有助于确保你得到你需要的结果和。

  • 一次只操作一个元素。如果你使用了 workqueue.Interface,你可以将某个资源的变化排成队列,随后将它们弹到多个 "worker " gofuncs 中,并保证没有两个 gofuncs 会同时对同一个元素进行操作。

  • 很多控制器必须触发掉多个资源(我需要 "如果Y发生变化就检查X"),但几乎所有的控制器都可以根据关系将这些资源折叠成一个 "检查这个X "的队列。例如,ReplicaSet 控制器需要对一个 Pod 被删除做出反应,但它通过找到相关的 ReplicaSets 并对这些进行排队来实现。

  • 资源之间的随机排序。当控制器排队关闭多种类型的资源时,无法保证这些资源之间的排序。

  • 不同的 watches 是独立更新的。即使有 "创建的资源A/X "和 "创建的资源B/Y "的客观排序,你的控制器也可以观察到 "创建的资源B/Y "和 "创建的资源A/X"。

  • 级别驱动,而不是边缘驱动。就像有一个 shell 脚本不是一直在运行一样,你的控制器可能会在再次运行之前关闭不确定的时间。

  • 如果一个 API 对象出现的标记值为 true,你不能指望已经看到它从false 变成 true,只能说你现在观察到它是 true。即使是 API watch 也会受到这个问题的影响,所以要确保你不指望看到变化,除非你的控制器也在对象的状态中标记它最后做出决定的信息。

  • 使用 SharedInformers。SharedInformers 提供了回调函数来接收特定资源的添加、更新和删除的通知,它们还提供了访问共享缓存和确定缓存何时启动的便利功能。

    使用 https://git.k8s.io/kubernetes/staging/src/k8s.io/client-go/informers/factory.go 中的工厂方法来确保你和其他人共享同一个缓存实例。

    这样我们就大大减少了 APIServer 的连接以及重复的序列化、重复的反序列化、重复的缓存等成本。

    你可能会看到其他机制,比如反射器和 DeltaFIFO 驱动控制器。这些都是旧的机制,我们后来用它们来构建 SharedInformers,你应该避免在新控制器中使用它们。

  • 永远不要 Mutate 原始对象!!缓存是跨控制器共享的,这意味着如果你修改了你的对象的 "副本"(实际上是引用或浅层副本),你会搞乱其他控制器(不仅仅是你自己的)。

    最常见的失败方式是做一个浅层拷贝,然后 Mutate 一个映射,比如Annotations。使用 api.Scheme.Copy 进行深层复制。

    package main
    
    import (
    	"flag"
    	"fmt"
    	"path/filepath"
    	"time"
    
    	v1 "k8s.io/api/core/v1"
    	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    	"k8s.io/apimachinery/pkg/fields"
    	"k8s.io/apimachinery/pkg/util/runtime"
    	"k8s.io/apimachinery/pkg/util/wait"
    	"k8s.io/client-go/kubernetes"
    	"k8s.io/client-go/rest"
    	"k8s.io/client-go/tools/cache"
    	"k8s.io/client-go/tools/clientcmd"
    	"k8s.io/client-go/util/homedir"
    	"k8s.io/client-go/util/workqueue"
    	"k8s.io/klog"
    )
    
    type Controller struct {
    	indexer  cache.Indexer
    	queue    workqueue.RateLimitingInterface
    	informer cache.Controller
    }
    
    func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
    	return &Controller{
    		informer: informer,
    		indexer:  indexer,
    		queue:    queue,
    	}
    }
    
    func (c *Controller) processNextItem() bool {
    	// 等到工作队列中有一个新元素
    	key, quit := c.queue.Get()
    	if quit {
    		return false
    	}
    	// 告诉队列我们已经完成了处理此 key 的操作
    	// 这将为其他 worker 解锁该 key
    	// 这将确保安全的并行处理,因为永远不会并行处理具有相同 key 的两个Pod
    	defer c.queue.Done(key)
    
    	// 调用包含业务逻辑的方法
    	err := c.syncToStdout(key.(string))
    	// 如果在执行业务逻辑期间出现错误,则处理错误
    	c.handleErr(err, key)
    	return true
    }
    
    // syncToStdout 是控制器的业务逻辑实现
    // 在此控制器中,它只是将有关 Pod 的信息打印到 stdout
    // 如果发生错误,则简单地返回错误
    // 此外重试逻辑不应成为业务逻辑的一部分。
    func (c *Controller) syncToStdout(key string) error {
    	// 从本地存储中获取 key 对应的对象
    	obj, exists, err := c.indexer.GetByKey(key)
    	if err != nil {
    		klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
    		return err
    	}
    	if !exists {
    		fmt.Printf("Pod %s does not exists anymore\\n", key)
    	} else {
    		fmt.Printf("Sync/Add/Update for Pod %s\\n", obj.(*v1.Pod).GetName())
    	}
    	return nil
    }
    
    // 检查是否发生错误,并确保我们稍后重试
    func (c *Controller) handleErr(err error, key interface{}) {
    	if err == nil {
    		// 忘记每次成功同步时 key 的#AddRateLimited历史记录。
    		// 这样可以确保不会因过时的错误历史记录而延迟此 key 更新的以后处理。
    		c.queue.Forget(key)
    		return
    	}
    	//如果出现问题,此控制器将重试5次
    	if c.queue.NumRequeues(key) < 5 {
    		klog.Infof("Error syncing pod %v: %v", key, err)
    		// 重新加入 key 到限速队列
    		// 根据队列上的速率限制器和重新入队历史记录,稍后将再次处理该 key
    		c.queue.AddRateLimited(key)
    		return
    	}
    	c.queue.Forget(key)
    	// 多次重试,我们也无法成功处理该key
    	runtime.HandleError(err)
    	klog.Infof("Dropping pod %q out of the queue: %v", key, err)
    }
    
    // Run 开始 watch 和同步
    func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
    	defer runtime.HandleCrash()
    
    	// 停止控制器后关掉队列
    	defer c.queue.ShutDown()
    	klog.Info("Starting Pod controller")
    
    	// 启动
    	go c.informer.Run(stopCh)
    
    	// 等待所有相关的缓存同步,然后再开始处理队列中的项目
    	if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
    		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    		return
    	}
    
    	for i := 0; i < threadiness; i++ {
    		go wait.Until(c.runWorker, time.Second, stopCh)
    	}
    
    	<-stopCh
    	klog.Info("Stopping Pod controller")
    }
    
    func (c *Controller) runWorker() {
    	for c.processNextItem() {
    	}
    }
    
    func initClient() (*kubernetes.Clientset, error) {
    	var err error
    	var config *rest.Config
    	// inCluster(Pod)、KubeConfig(kubectl)
    	var kubeconfig *string
    
    	if home := homedir.HomeDir(); home != "" {
    		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
    	} else {
    		kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
    	}
    	flag.Parse()
    
    	// 首先使用 inCluster 模式(需要去配置对应的 RBAC 权限,默认的sa是default->是没有获取deployments的List权限)
    	if config, err = rest.InClusterConfig(); err != nil {
    		// 使用 KubeConfig 文件创建集群配置 Config 对象
    		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
    			panic(err.Error())
    		}
    	}
    
    	// 已经获得了 rest.Config 对象
    	// 创建 Clientset 对象
    	return kubernetes.NewForConfig(config)
    }
    
    func main() {
    	clientset, err := initClient()
    	if err != nil {
    		klog.Fatal(err)
    	}
    
    	// 创建 Pod ListWatcher
    	podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
    
    	// 创建队列
    	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    
    	// 在 informer 的帮助下,将工作队列绑定到缓存
    	// 这样,我们确保无论何时更新缓存,都将 pod key 添加到工作队列中
    	// 注意,当我们最终从工作队列中处理元素时,我们可能会看到 Pod 的版本比响应触发更新的版本新
    	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
    		AddFunc: func(obj interface{}) {
    			key, err := cache.MetaNamespaceKeyFunc(obj)
    			if err == nil {
    				queue.Add(key)
    			}
    		},
    		UpdateFunc: func(old interface{}, new interface{}) {
    			key, err := cache.MetaNamespaceKeyFunc(new)
    			if err == nil {
    				queue.Add(key)
    			}
    		},
    		DeleteFunc: func(obj interface{}) {
    			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
    			if err == nil {
    				queue.Add(key)
    			}
    		},
    	}, cache.Indexers{})
    
    	controller := NewController(queue, indexer, informer)
    
    	indexer.Add(&v1.Pod{
    		ObjectMeta: metav1.ObjectMeta{
    			Name:      "mypod",
    			Namespace: v1.NamespaceDefault,
    		},
    	})
    
    	// start controller
    	stopCh := make(chan struct{})
    	defer close(stopCh)
    	go controller.Run(1, stopCh)
    
    	select {}
    }

未经允许不得转载:江哥架构师笔记 » 控制器:编写控制器

分享到:更多 ()