本文主要介绍如何使用 WorkQueue 来编写控制器
介绍
Kubernetes 控制器是一个主动调谐的过程,它会 watch 一些对象的期望状态,也会 watch 实际的状态,然后,控制器发送一些指令尝试让对象的当前状态往期望状态迁移。
控制器最简单的实现就是一个循环:
for { desired := getDesiredState() current := getCurrentState() makeChanges(desired, current) }
Watches 这些都只是这个逻辑的优化。
https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
指南
当你在编写控制器时,有一些准则将有助于确保你得到你需要的结果和。
-
一次只操作一个元素。如果你使用了
workqueue.Interface
,你可以将某个资源的变化排成队列,随后将它们弹到多个 "worker " gofuncs 中,并保证没有两个 gofuncs 会同时对同一个元素进行操作。 -
很多控制器必须触发掉多个资源(我需要 "如果Y发生变化就检查X"),但几乎所有的控制器都可以根据关系将这些资源折叠成一个 "检查这个X "的队列。例如,ReplicaSet 控制器需要对一个 Pod 被删除做出反应,但它通过找到相关的 ReplicaSets 并对这些进行排队来实现。
-
资源之间的随机排序。当控制器排队关闭多种类型的资源时,无法保证这些资源之间的排序。
-
不同的 watches 是独立更新的。即使有 "创建的资源A/X "和 "创建的资源B/Y "的客观排序,你的控制器也可以观察到 "创建的资源B/Y "和 "创建的资源A/X"。
-
级别驱动,而不是边缘驱动。就像有一个 shell 脚本不是一直在运行一样,你的控制器可能会在再次运行之前关闭不确定的时间。
-
如果一个 API 对象出现的标记值为 true,你不能指望已经看到它从false 变成 true,只能说你现在观察到它是 true。即使是 API watch 也会受到这个问题的影响,所以要确保你不指望看到变化,除非你的控制器也在对象的状态中标记它最后做出决定的信息。
-
使用 SharedInformers。SharedInformers 提供了回调函数来接收特定资源的添加、更新和删除的通知,它们还提供了访问共享缓存和确定缓存何时启动的便利功能。
使用
https://git.k8s.io/kubernetes/staging/src/k8s.io/client-go/informers/factory.go
中的工厂方法来确保你和其他人共享同一个缓存实例。这样我们就大大减少了 APIServer 的连接以及重复的序列化、重复的反序列化、重复的缓存等成本。
你可能会看到其他机制,比如反射器和 DeltaFIFO 驱动控制器。这些都是旧的机制,我们后来用它们来构建 SharedInformers,你应该避免在新控制器中使用它们。
-
永远不要 Mutate 原始对象!!缓存是跨控制器共享的,这意味着如果你修改了你的对象的 "副本"(实际上是引用或浅层副本),你会搞乱其他控制器(不仅仅是你自己的)。
最常见的失败方式是做一个浅层拷贝,然后 Mutate 一个映射,比如Annotations。使用
api.Scheme.Copy
进行深层复制。package main import ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) type Controller struct { indexer cache.Indexer queue workqueue.RateLimitingInterface informer cache.Controller } func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller { return &Controller{ informer: informer, indexer: indexer, queue: queue, } } func (c *Controller) processNextItem() bool { // 等到工作队列中有一个新元素 key, quit := c.queue.Get() if quit { return false } // 告诉队列我们已经完成了处理此 key 的操作 // 这将为其他 worker 解锁该 key // 这将确保安全的并行处理,因为永远不会并行处理具有相同 key 的两个Pod defer c.queue.Done(key) // 调用包含业务逻辑的方法 err := c.syncToStdout(key.(string)) // 如果在执行业务逻辑期间出现错误,则处理错误 c.handleErr(err, key) return true } // syncToStdout 是控制器的业务逻辑实现 // 在此控制器中,它只是将有关 Pod 的信息打印到 stdout // 如果发生错误,则简单地返回错误 // 此外重试逻辑不应成为业务逻辑的一部分。 func (c *Controller) syncToStdout(key string) error { // 从本地存储中获取 key 对应的对象 obj, exists, err := c.indexer.GetByKey(key) if err != nil { klog.Errorf("Fetching object with key %s from store failed with %v", key, err) return err } if !exists { fmt.Printf("Pod %s does not exists anymore\\n", key) } else { fmt.Printf("Sync/Add/Update for Pod %s\\n", obj.(*v1.Pod).GetName()) } return nil } // 检查是否发生错误,并确保我们稍后重试 func (c *Controller) handleErr(err error, key interface{}) { if err == nil { // 忘记每次成功同步时 key 的#AddRateLimited历史记录。 // 这样可以确保不会因过时的错误历史记录而延迟此 key 更新的以后处理。 c.queue.Forget(key) return } //如果出现问题,此控制器将重试5次 if c.queue.NumRequeues(key) < 5 { klog.Infof("Error syncing pod %v: %v", key, err) // 重新加入 key 到限速队列 // 根据队列上的速率限制器和重新入队历史记录,稍后将再次处理该 key c.queue.AddRateLimited(key) return } c.queue.Forget(key) // 多次重试,我们也无法成功处理该key runtime.HandleError(err) klog.Infof("Dropping pod %q out of the queue: %v", key, err) } // Run 开始 watch 和同步 func (c *Controller) Run(threadiness int, stopCh chan struct{}) { defer runtime.HandleCrash() // 停止控制器后关掉队列 defer c.queue.ShutDown() klog.Info("Starting Pod controller") // 启动 go c.informer.Run(stopCh) // 等待所有相关的缓存同步,然后再开始处理队列中的项目 if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } <-stopCh klog.Info("Stopping Pod controller") } func (c *Controller) runWorker() { for c.processNextItem() { } } func initClient() (*kubernetes.Clientset, error) { var err error var config *rest.Config // inCluster(Pod)、KubeConfig(kubectl) var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径") } flag.Parse() // 首先使用 inCluster 模式(需要去配置对应的 RBAC 权限,默认的sa是default->是没有获取deployments的List权限) if config, err = rest.InClusterConfig(); err != nil { // 使用 KubeConfig 文件创建集群配置 Config 对象 if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 已经获得了 rest.Config 对象 // 创建 Clientset 对象 return kubernetes.NewForConfig(config) } func main() { clientset, err := initClient() if err != nil { klog.Fatal(err) } // 创建 Pod ListWatcher podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything()) // 创建队列 queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) // 在 informer 的帮助下,将工作队列绑定到缓存 // 这样,我们确保无论何时更新缓存,都将 pod key 添加到工作队列中 // 注意,当我们最终从工作队列中处理元素时,我们可能会看到 Pod 的版本比响应触发更新的版本新 indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, UpdateFunc: func(old interface{}, new interface{}) { key, err := cache.MetaNamespaceKeyFunc(new) if err == nil { queue.Add(key) } }, DeleteFunc: func(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { queue.Add(key) } }, }, cache.Indexers{}) controller := NewController(queue, indexer, informer) indexer.Add(&v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "mypod", Namespace: v1.NamespaceDefault, }, }) // start controller stopCh := make(chan struct{}) defer close(stopCh) go controller.Run(1, stopCh) select {} }