路漫漫其修远兮
吾将上下而求索

控制器:自定义控制器

本文主要介绍如何为 CRD 资源编写自定义的控制器

介绍

上节课我们已经学习了如何使用 code-generator 来进行代码自动生成,通过代码自动生成可以帮我们自动生成 CRD 资源对象客户端访问的 ClientSet、Informer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。

这里可以参考:https://github.com/kubernetes/sample-controller/tree/master

经过测试,代码生成和对应的测试都没有问题,优先查看和学习上面的项目

CRD 定义

这里我们来针对前面课程中介绍的 CronTab 自定义资源对象编写一个自定义的控制器,对应的资源清单文件如下所示:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: crontabs.stable.example.com
spec:
  group: stable.example.com
  versions:
  - name: v1beta1  
    served: true 
    storage: true 
    schema:  
      openAPIV3Schema:
        description: Define CronTab YAML Spec
        type: object
        properties:
          spec:
            type: object
            properties:
              cronSpec:
                type: string
              image:
                type: string
              replicas:
                type: integer
  scope: Namespaced
  names:
    kind: CronTab
    plural: crontabs
    singular: crontab    
    shortNames:
    - ct

我们需要针对上面的规范来定义资源结构,首先初始化项目:

$ mkdir -p github.com/cnych/controller-demo && cd github.com/cnych/controller-demo
# 初始化项目
$ go mod init github.com/cnych/controller-demo
go: creating new go.mod: module github.com/cnych/controller-demo
# 获取依赖
$ go get k8s.io/apimachinery@v0.17.9
$ go get -d k8s.io/code-generator@v0.17.9
$ go get k8s.io/client-go@v0.17.9

然后初始化 CRD 资源类型,建立好自己的 CRD 结构体,然后使用code-generator 生成客户端代码:

$ mkdir -p pkg/apis/stable/v1beta1

在该文件夹中新建 doc.go 文件,内容如下所示:

// +k8s:deepcopy-gen=package
// +groupName=stable.example.com

package v1beta1

根据 CRD 的规范定义,这里我们定义的 group 为 stable.example.com,版本为 v1beta1 ,在顶部添加了一个代码自动生成的 deepcopy-gen 的 tag,为整个包中的类型生成深拷贝方法。

然后就是非常重要的资源对象的结构体定义,新建 type.go 文件,内容如下所示:

package v1beta1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// 根据 CRD 定义 CronTab 结构体
type CronTab struct {
	metav1.TypeMeta `json:",inline"`
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty"`
	Spec              CronTabSpec `json:"spec"`
}

// +k8s:deepcopy-gen=false

type CronTabSpec struct {
	CronSpec string `json:"cronSpec"`
	Image    string `json:"image"`
	Replicas int    `json:"replicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// CronTab 资源列表
type CronTabList struct {
	metav1.TypeMeta `json:",inline"`

	// 标准的 list metadata
	// +optional
	metav1.ListMeta `json:"metadata,omitempty"`

	Items []CronTab `json:"items"`
}

然后可以参考系统内置的资源对象,还需要提供 AddToScheme 与 Resource 两个变量供 client 注册,新建 register.go 文件,内容如下所示:

package v1beta1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/schema"
)

// GroupName is the group name use in this package
const GroupName = "stable.example.com"
// 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1beta1"}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
	// SchemeBuilder initializes a scheme builder
	SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
	// AddToScheme is a global function that registers this API group & version to a scheme
	AddToScheme = SchemeBuilder.AddToScheme
)

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
  // 添加 CronTab 与 CronTabList 这两个资源到 scheme
	scheme.AddKnownTypes(SchemeGroupVersion,
		&CronTab{},
		&CronTabList{},
	)
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

最终的项目结构如下所示:

$ tree .
.
├── go.mod
├── go.sum
└── pkg
    └── apis
        └── stable
            └── v1beta1
                ├── doc.go
                ├── register.go
                └── types.go

4 directories, 5 files

生成代码

上面我们准备好资源的 API 资源类型后,就可以使用开始生成 CRD 资源的客户端使用的相关代码了。

首先创建生成代码的脚本,下面这些脚本均来源于 sample-controller 提供的示例:

$ mkdir hack && cd hack

在该目录下面新建 tools.go 文件,添加 code-generator 依赖,因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包。文件内容如下所示:

// +build tools

// 建立 tools.go 来依赖 code-generator
// 因为在没有代码使用 code-generator 时,go module 默认不会为我们依赖此包.
package tools

import _ "k8s.io/code-generator"

然后使用 update-codegen.sh 脚本,修改里面的包名称,用来配置代码生成的脚本:

同样还有上节课提到的 verify-codegen.sh 脚本,用来校验生成的代码是否是最新的:

还有一个为生成的代码文件添加头部内容的 boilerplate.go.txt 文件,内容如下所示,其实就是为每个生成的代码文件头部添加上下面的开源协议声明:

/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    <http://www.apache.org/licenses/LICENSE-2.0>

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

接下来我们就可以来执行代码生成的脚本了,首先将依赖包放置到 vendor 目录中去:

$ go mod vendor

然后执行脚本生成代码:

$ chmod +x ./hack/update-codegen.sh
$ ./hack/update-codegen.sh
Generating deepcopy funcs
Generating clientset for stable:v1beta1 at github.com/cnych/controller-demo/pkg/client/clientset
Generating listers for stable:v1beta1 at github.com/cnych/controller-demo/pkg/client/listers
Generating informers for stable:v1beta1 at github.com/cnych/controller-demo/pkg/client/informers

代码生成后,整个项目的 pkg 包变成了下面的样子:

$ tree pkg
pkg
├── apis
│   └── stable
│       └── v1beta1
│           ├── doc.go
│           ├── register.go
│           ├── types.go
│           └── zz_generated.deepcopy.go
└── client
    ├── clientset
    │   └── versioned
    │       ├── clientset.go
    │       ├── doc.go
    │       ├── fake
    │       │   ├── clientset_generated.go
    │       │   ├── doc.go
    │       │   └── register.go
    │       ├── scheme
    │       │   ├── doc.go
    │       │   └── register.go
    │       └── typed
    │           └── stable
    │               └── v1beta1
    │                   ├── crontab.go
    │                   ├── doc.go
    │                   ├── fake
    │                   │   ├── doc.go
    │                   │   ├── fake_crontab.go
    │                   │   └── fake_stable_client.go
    │                   ├── generated_expansion.go
    │                   └── stable_client.go
    ├── informers
    │   └── externalversions
    │       ├── factory.go
    │       ├── generic.go
    │       ├── internalinterfaces
    │       │   └── factory_interfaces.go
    │       └── stable
    │           ├── interface.go
    │           └── v1beta1
    │               ├── crontab.go
    │               └── interface.go
    └── listers
        └── stable
            └── v1beta1
                ├── crontab.go
                └── expansion_generated.go

20 directories, 26 files

仔细观察可以发现 pkg/apis/stable/v1beta1 目录下面多了一个zz_generated.deepcopy.go 文件,在 pkg/client 文件夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的客户端相关操作包,我们就可以去访问 CRD 资源了,可以和使用内置的资源对象一样去对 CronTab 进行 List 和 Watch 操作了。

编写控制器

可以参考前面编写控制器的方式,首先要先获取访问资源对象的 ClientSet,在项目根目录下面新建 main.go 文件

package main

import (
	"flag"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/klog"

	clientset "github.com/cnych/controller-demo/pkg/client/clientset/versioned"
	informers "github.com/cnych/controller-demo/pkg/client/informers/externalversions"
)

var (
	onlyOneSignalHandler = make(chan struct{})
	shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
)

// SetupSignalHandler 注册 SIGTERM 和 SIGINT 信号
// 返回一个 stop channel,该通道在捕获到第一个信号时被关闭
// 如果捕捉到第二个信号,程序将直接退出
func setupSignalHandler() (stopCh <-chan struct{}) {
	// 当调用两次的时候 panics
	close(onlyOneSignalHandler)

	stop := make(chan struct{})
	c := make(chan os.Signal, 2)
	// Notify 函数让 signal 包将输入信号转发到 c
	// 如果没有列出要传递的信号,会将所有输入信号传递到 c;否则只传递列出的输入信号
	// 参考文档:<https://cloud.tencent.com/developer/article/1645996>
	signal.Notify(c, shutdownSignals...)
	go func() {
		<-c
		close(stop)
		<-c
		os.Exit(1) // 第二个信号,直接退出
	}()

	return stop
}

func initClient() (*kubernetes.Clientset, *rest.Config, error) {
	var err error
	var config *rest.Config
	// inCluster(Pod)、KubeConfig(kubectl)
	var kubeconfig *string

	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) kubeconfig 文件的绝对路径")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "kubeconfig 文件的绝对路径")
	}
	flag.Parse()

	// 首先使用 inCluster 模式(需要去配置对应的 RBAC 权限,默认的sa是default->是没有获取deployments的List权限)
	if config, err = rest.InClusterConfig(); err != nil {
		// 使用 KubeConfig 文件创建集群配置 Config 对象
		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
			panic(err.Error())
		}
	}

	// 已经获得了 rest.Config 对象
	// 创建 Clientset 对象
	kubeclient, err := kubernetes.NewForConfig(config)
	if err != nil {
		return nil, config, err
	}
	return kubeclient, config, nil
}

func main() {
	flag.Parse()

	//设置一个信号处理,应用于优雅关闭
	stopCh := setupSignalHandler()

	_, cfg, err := initClient()
	if err != nil {
		klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
	}
	
  // 实例化一个 CronTab 的 ClientSet
	crontabClient, err := clientset.NewForConfig(cfg)
	if err != nil {
		klog.Fatalf("Error building crontab clientset: %s", err.Error())
	}

	// informerFactory 工厂类, 这里注入我们通过代码生成的 client
	// clent 主要用于和 API Server 进行通信,实现 ListAndWatch
	crontabInformerFactory := informers.NewSharedInformerFactory(crontabClient, time.Second*30)

	// 实例化自定义控制器
	controller := NewController(crontabInformerFactory.Stable().V1beta1().CronTabs())

	// 启动 informer,开始List & Watch
	go crontabInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}
}

首先初始化一个用于访问 CronTab 资源的 ClientSet 对象,然后同样新建一个 CronTab 的 InformerFactory 实例,通过这个工厂实例可以去启动 Informer 开始对 CronTab 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制循环,不断对 CronTab 的状态进行调谐。

在项目根目录下新建 controller.go 文件,内容如下所示:

package main

import (
	"fmt"
	"time"

	"k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"

	crdv1beta1 "github.com/cnych/controller-demo/pkg/apis/stable/v1beta1"
	informers "github.com/cnych/controller-demo/pkg/client/informers/externalversions/stable/v1beta1"
)

type Controller struct {
	informer  informers.CronTabInformer
	workqueue workqueue.RateLimitingInterface
}

func NewController(informer informers.CronTabInformer) *Controller {
	//使用client 和前面创建的 Informer,初始化了自定义控制器
	controller := &Controller{
		informer: informer,
		// WorkQueue 的实现,负责同步 Informer 和控制循环之间的数据
		workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CronTab"),
	}

	klog.Info("Setting up crontab event handlers")

	// informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
	// 分别对应 API 对象的“添加”“更新”和“删除”事件。
	// 而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中
	informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.enqueueCronTab,
		UpdateFunc: func(old, new interface{}) {
			oldObj := old.(*crdv1beta1.CronTab)
			newObj := new.(*crdv1beta1.CronTab)
      // 如果资源版本相同则不处理
			if oldObj.ResourceVersion == newObj.ResourceVersion {
				return
			}
			controller.enqueueCronTab(new)
		},
		DeleteFunc: controller.enqueueCronTabForDelete,
	})
	return controller
}

func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
	defer runtime.HandleCrash()
	defer c.workqueue.ShutDown()

	// 记录开始日志
	klog.Info("Starting CronTab control loop")
	klog.Info("Waiting for informer caches to sync")
	if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
		return fmt.Errorf("failed to wait for caches to sync")
	}

	klog.Info("Starting workers")
	for i := 0; i < threadiness; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	klog.Info("Started workers")
	<-stopCh
	klog.Info("Shutting down workers")
	return nil
}

// runWorker 是一个不断运行的方法,并且一直会调用 c.processNextWorkItem 从workqueue读取和读取消息
func (c *Controller) runWorker() {
	for c.processNextWorkItem() {
	}
}

// 从workqueue读取和读取消息
func (c *Controller) processNextWorkItem() bool {
	obj, shutdown := c.workqueue.Get()
	if shutdown {
		return false
	}
	err := func(obj interface{}) error {
		defer c.workqueue.Done(obj)
		var key string
		var ok bool
		if key, ok = obj.(string); !ok {
			c.workqueue.Forget(obj)
			runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
			return nil
		}
		if err := c.syncHandler(key); err != nil {
			return fmt.Errorf("error syncing '%s': %s", key, err.Error())
		}
		c.workqueue.Forget(obj)
		klog.Infof("Successfully synced '%s'", key)
		return nil
	}(obj)

	if err != nil {
		runtime.HandleError(err)
		return true
	}
	return true
}

// 尝试从 Informer 维护的缓存中拿到了它所对应的 CronTab 对象
func (c *Controller) syncHandler(key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
		return nil
	}

	crontab, err := c.informer.Lister().CronTabs(namespace).Get(name)

	//从缓存中拿不到这个对象,那就意味着这个 CronTab 对象的 Key 是通过前面的“删除”事件添加进工作队列的。
	if err != nil {
		if errors.IsNotFound(err) {
			// 对应的 crontab 对象已经被删除了
			klog.Warningf("[CronTabCRD] %s/%s does not exist in local cache, will delete it from CronTab ...",
				namespace, name)
			klog.Infof("[CronTabCRD] deleting crontab: %s/%s ...", namespace, name)
			return nil
		}
		runtime.HandleError(fmt.Errorf("failed to get crontab by: %s/%s", namespace, name))
		return err
	}
	klog.Infof("[CronTabCRD] try to process crontab: %#v ...", crontab)
	return nil
}

func (c *Controller) enqueueCronTab(obj interface{}) {
	var key string
	var err error
	if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

func (c *Controller) enqueueCronTabForDelete(obj interface{}) {
	var key string
	var err error
	key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil {
		runtime.HandleError(err)
		return
	}
	c.workqueue.AddRateLimited(key)
}

我们这里自定义的控制器只封装了一个 Informer 和一个限速队列,我们当然也可以在里面添加一个用于访问本地缓存的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地缓存中获取数据,所以只用一个 Informer 也是完全可行的。

同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 队列,然后通过控制器的控制循环不断从队列中消费数据,根据获取的 key 来获取数据判断对象是需要删除还是需要进行其他业务处理,这里我们同样也只是打印出了对应的操作日志,对于实际的项目则进行相应的业务逻辑处理即可。

到这里一个完整的自定义 API 对象和它所对应的自定义控制器就编写完毕了。

测试

接下来我们可以直接编译代码:

$ go build -ldflags "-s -w" -v -o crontab-controller .

编译完成后,会生成 crontab-controller 的二进制文件,由于我们代码中支持 inCluster 和 kubeconfig 两种模式,所以我们可以在 Kubernetes 集群中或者在本地(前提是可以访问 Kubernetes 集群)都可以运行该控制器。

比如我们在本地运行,由于 ~/.kube/config 配置了访问 Kubernetes 集群的文件,所以我们直接运行该控制器即可:

$ ./crontab-controller
I1010 15:18:06.402954   82070 controller.go:31] Setting up crontab event handlers
I1010 15:18:06.403365   82070 controller.go:57] Starting CronTab control loop
I1010 15:18:06.403377   82070 controller.go:58] Waiting for informer caches to sync
I1010 15:18:06.605476   82070 controller.go:63] Starting workers
I1010 15:18:06.605582   82070 controller.go:68] Started workers

现在我们来创建一个 CronTab 资源对象:

# crontab-demo.yaml
apiVersion: stable.example.com/v1beta1
kind: CronTab
metadata:
  name: crontab-demo
  namespace: default
spec:
  image: "nginx:1.7.9"
  cronSpec: "* * * * */5"
  replicas: 2

直接创建上面的对象,注意观察控制器的日志:

$ kubectl apply -f crontab-demo.yaml
crontab.stable.example.com/crontab-demo created
$ kubectl get crontab
NAME           AGE
crontab-demo   70s

在上面的资源对象创建成功后在控制器的日志中则会出现如下所示的信息:

......
I1010 15:18:06.605476   82070 controller.go:63] Starting workers
I1010 15:18:06.605582   82070 controller.go:68] Started workers
I1010 15:19:55.115241   82070 controller.go:132] [CronTabCRD] try to process crontab: &v1beta1.CronTab{TypeMeta:v1.TypeMeta{Kind:"", APIVersion:""}, 
ObjectMeta:v1.ObjectMeta{Name:"crontab-demo", GenerateName:"", Namespace:"default", 
SelfLink:"/apis/stable.example.com/v1beta1/namespaces/default/crontabs/crontab-demo", UID:"1368bca5-4189-4f1a-9e8c-1d216ff67c24", 
ResourceVersion:"134155788", Generation:1, CreationTimestamp:v1.Time{Time:time.Time{wall:0x0, ext:63737911195, loc:(*time.Location)(0x29d5a20)}}, 
DeletionTimestamp:(*v1.Time)(nil), DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), 
Annotations:map[string]string{"kubectl.kubernetes.io/last-applied-configuration":"{\\"apiVersion\\":\\"stable.example.com/v1beta1\\",
\\"kind\\":\\"CronTab\\",\\"metadata\\":{\\"annotations\\":{},\\"name\\":\\"crontab-demo\\",\\"namespace\\":\\"default\\"},
\\"spec\\":{\\"cronSpec\\":\\"* * * * */5\\",\\"image\\":\\"nginx:1.7.9\\",\\"replicas\\":2}}\\n"}, 
OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ClusterName:"", ManagedFields:[]v1.ManagedFieldsEntry(nil)}, 
Spec:v1beta1.CronTabSpec{CronSpec:"* * * * */5", Image:"nginx:1.7.9", Replicas:2}} ...
I1010 15:19:55.116099   82070 controller.go:99] Successfully synced 'default/crontab-demo'

可以看到,我们上面创建 crontab-demo.yaml 的操作,触发了 EventHandler 的添加事件,从而被放进了工作队列。然后控制器的控制循环从队列里拿到这个对象,并且打印出了正在处理这个 CronTab 对象的日志信息。

同样我们删除这个资源的时候,也会有对应的提示:

$ kubectl delete crontab crontab-demo
crontab.stable.example.com "crontab-demo" deleted

当删除成功后对应的控制器出现了如下所示的日志信息:

W1010 15:23:11.738930   82070 controller.go:124] [CronTabCRD] default/crontab-demo does not exist in local cache, will delete it from CronTab ...
I1010 15:23:11.738952   82070 controller.go:126] [CronTabCRD] deleting crontab: default/crontab-demo ...
I1010 15:23:11.738961   82070 controller.go:99] Successfully synced 'default/crontab-demo'

这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 信息,这样我们就可以通过 Event 去了解我们资源的状态了。

未经允许不得转载:江哥架构师笔记 » 控制器:自定义控制器

分享到:更多 ()