路漫漫其修远兮
吾将上下而求索

operator:备份 etcd operator 开发

本章我们来实现编写一个用于 etcd 备份的 operator。

介绍

要实现 etcd 集群的完整运维,备份和恢复肯定也是必不可少的,本文主要和大家介绍如何编写一个用与 etcd 备份的 Operator。

首先当然需要了解 etcd 的备份逻辑。etcd 的数据默认会存放在我们的命令工作目录中,数据所在的目录会被分为两个文件夹中:

  • snap: 存放快照数据,etcd 防止 WAL 文件过多而设置的快照,存储 etcd 数据状态。

  • wal: 存放预写式日志,最大的作用是记录了整个数据变化的全部历程,在 etcd 中,所有数据的修改在提交前,都要先写入到WAL中。

ETCD 不同的版本的 etcdctl 命令不一样,但大致差不多,备份我们可以直接使用 snapshot save ,由于 etcd 集群本身就是分布式的,所以每次备份一个节点就行。

# 备份命令
$ ETCDCTL_API=3 etcdctl --endpoints=${ENDPOINTS} snapshot save /data/etcd_backup_dir/etcd-snapshot.db

恢复时会覆盖 snapshot 的元数据(member ID 和 cluster ID),使用 snapshot restore 命令指定备份的数据目录即可。

从上面我们可以看出要备份 etcd 集群是很简单的,只需要用一条命令指定备份的节点和备份的数据目录即可,所以如果是我们要编写一个 Operator 来完成这个动作在 CR 资源里面至少要提供备份的 etcd 节点地址,以及备份的数据存放目录,对于备份数据这种一般我们用对象存储来保存,比如 S3、OSS 等,这里我们测试的时候使用兼容 S3 接口的 minio,但是为了扩展我们需要在 CR 资源里面明确告诉控制器我们希望把数据备份到什么类型的什么路径上去,比如这里我们提前设计一个 CR 资源如下所示:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdBackup
metadata:
  name: etcdbackup-sample
spec:
  etcdUrl: <etcd-cluster-endpoints>  # 备份的节点 etcd 地址
  storageType: s3  # 指定备份类型
  s3:
    path: "foo-bucket/snapshot.db"  # 数据存放目录
    s3Secret: "secret"   # 包含 accessKeyID 与 secretAccessKey
  oss:
    path: "foo-bucket/snapshot.db"
    ossSecret: "secret"

设计好了 CR 资源过后,接下来我们只需要去创建这个 API 资源,然后实现对应的控制器就可以了。

添加接口

同样直接在项目目录下面执行创建 API 的命令:

$ kubebuilder create api --group etcd --version v1alpha1 --kind EtcdBackup
Create Resource [y/n]
y
Create Controller [y/n]
y
Writing scaffold for you to edit...
api/v1alpha1/etcdbackup_types.go
controllers/etcdbackup_controller.go
Running make:
$ make
/Users/ych/devs/projects/go/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
go fmt ./...
go vet ./...
go build -o bin/manager main.go

创建完成后,在项目中会新增 EtcdBackup 相关的 API 和对应的控制器,我们可以用上面设计的 CR 资源覆盖 samples 目录中的 EtcdBackup 对象。

https://bxdc-static.oss-cn-beijing.aliyuncs.com/images/20201130153825.png

接下来同样是根据我们预设计的 CR 资源去更改 EtcdBackup 的结构体,修改里面的 EtcdBackupSpec 结构体:

// api/v1alpha1/etcdbackup_types.go

type BackupStorageType string

// EtcdBackupSpec defines the desired state of EtcdBackup
type EtcdBackupSpec struct {
	// Specific Backup Etcd Endpoints.
	EtcdUrl string `json:"etcdUrl"`
	// Storage Type:s3 OR oss
	StorageType BackupStorageType `json:"storageType"`
	// Backup Source
	BackupSource `json:",inline"`
}

// BackupSource contains the supported backup sources.
type BackupSource struct {
	// S3 defines the S3 backup source spec.
	S3 *S3BackupSource `json:"s3,omitempty"`
	// OSS defines the OSS backup source spec.
	OSS *OSSBackupSource `json:"oss,omitempty"`
}

// S3BackupSource provides the spec how to store backups on S3.
type S3BackupSource struct {
	// Path is the full s3 path where the backup is saved.
	// The format of the path must be: "<s3-bucket-name>/<path-to-backup-file>"
	// e.g: "mybucket/etcd.backup"
	Path string `json:"path"`

	// The name of the secret object that stores the credential which will be used
	// to access S3
	//
	// The secret must contain the following keys/fields:
	//     accessKeyID
	//     accessKeySecret
	S3Secret string `json:"s3Secret"`

	// Endpoint if blank points to aws. If specified, can point to s3 compatible object
	// stores.
	Endpoint string `json:"endpoint,omitempty"`
}

// OSSBackupSource provides the spec how to store backups on OSS.
type OSSBackupSource struct {
	// Path is the full abs path where the backup is saved.
	// The format of the path must be: "<oss-bucket-name>/<path-to-backup-file>"
	// e.g: "mybucket/etcd.backup"
	Path string `json:"path"`

	// The name of the secret object that stores the credential which will be used
	// to access Alibaba Cloud OSS.
	//
	// The secret must contain the following keys/fields:
	//     accessKeyID
	//     accessKeySecret
	//
	// The format of secret:
	//
	//   apiVersion: v1
	//   kind: Secret
	//   metadata:
	//     name: <my-credential-name>
	//   type: Opaque
	//   data:
	//     accessKeyID: <base64 of my-access-key-id>
	//     accessKeySecret: <base64 of my-access-key-secret>
	//
	OSSSecret string `json:"ossSecret"`

	// Endpoint is the OSS service endpoint on alibaba cloud, defaults to
	// "<http://oss-cn-hangzhou.aliyuncs.com>".
	//
	// Details about regions and endpoints, see:
	//  <https://www.alibabacloud.com/help/doc-detail/31837.htm>
	Endpoint string `json:"endpoint,omitempty"`
}

我们根据设计的 CR 来修改 EtcdBackup 的结构体,由于我们这里是一个备份任务,所以我们在这个基础上增加上 EtcdBackup 状态,我们需要关心的备份的操作状态、备份开始时间以及备份完成的时间,所以修改 EtcdBackupStatus 结构体如下所示:

// api/v1alpha1/etcdbackup_types.go

type EtcdBackupPhase string

var (
	EtcdBackupPhaseBackingUp EtcdBackupPhase = "BackingUp"
	EtcdBackupPhaseCompleted EtcdBackupPhase = "Completed"
	EtcdBackupPhaseFailed    EtcdBackupPhase = "Failed"
)

// EtcdBackupStatus defines the observed state of EtcdBackup
type EtcdBackupStatus struct {
	// Phase defines the current operation that the backup process is taking.
	Phase EtcdBackupPhase `json:"phase,omitempty"`
	// StartTime is the times that this backup entered the `BackingUp' phase.
	// +optional
	StartTime *metav1.Time `json:"startTime,omitempty"`
	// CompletionTime is the time that this backup entered the `Completed' phase.
	// +optional
	CompletionTime *metav1.Time `json:"completionTime,omitempty"`
}

API 结构定义完成后,接下来就是真正的控制器逻辑实现了。

业务逻辑

上面我们定义了 API 结构,下面就可以来实现控制的业务逻辑了。可能大家会疑惑,我们之前已经实现了一个 EtcdCluster 的控制器,现在又要实现一个 EtcdBackup 的控制器,那么我们这个 Operator 项目是如何来管理多个控制器的呢?

我们可以直接去看下 main.go 文件中的代码,其中有一段代码就是分别将这两个控制器注册到了 Manager 中,如下所示:

// main.go
if err = (&controllers.EtcdClusterReconciler{
		Client: mgr.GetClient(),
		Log:    ctrl.Log.WithName("controllers").WithName("EtcdCluster"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "EtcdCluster")
		os.Exit(1)
	}
	if err = (&controllers.EtcdBackupReconciler{
		Client: mgr.GetClient(),
		Log:    ctrl.Log.WithName("controllers").WithName("EtcdBackup"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "EtcdBackup")
		os.Exit(1)
	}
	// +kubebuilder:scaffold:builder

从这里我们可以看到一个 Manager 就可以管理多个控制器,所以即使以后再添加一些其他的控制器也没什么问题,我们要去实现的部分同样只是修改 API 结构体,然后在调谐函数 Reconcile 中实现业务逻辑即可。

接下来我们就来具体实现下 Reconcile 函数中的逻辑。

调谐实现

备份控制器的结构体定义完成了,也将控制器注册到了 Manager 中,接下来我们只需要在 Reconcile 函数中来实现调谐的逻辑即可。由于备份相当于就是一个 Job 任务,所以我们其实只需要根据我们期望的状态和实际的状态进行对比,然后判断下一步应该做什么操作,当然最终是启动一个 Pod 去执行备份任务,真正实现备份任务的就是这个 Pod 的镜像了,这个我们在后面去实现,这里我们首先用一个任意的镜像代替即可。

首先我们定义一个结构体,用来简单封装包含 EtcdBackup 对象本身以及真实和期望的状态:

// controllers/etcdbackup_controller.go

// backupState 包含 EtcdBackup 真实和期望的状态(这里的状态并不是说status)
type backupState struct {
	backup  *etcdv1alpha1.EtcdBackup // EtcdBackup 对象本身
	actual  *backupStateContainer    // 真实的状态
	desired *backupStateContainer    // 期望的状态
}

// backupStateContainer 包含 EtcdBackup 的状态
type backupStateContainer struct {
	pod *corev1.Pod
}

然后我们通过 Reconcile 函数来获取 backupState 对象,根据对象的状态来判断下一步的执行动作,定义如下所示几个函数来获取状态对象,期望的状态当然就包括我们要去构造的一个执行任务的 Pod 了:

// controllers/etcdbackup_controller.go

// setStateActual 用于设置 backupState 的真实状态
func (r *EtcdBackupReconciler) setStateActual(ctx context.Context, state *backupState) error {
	var actual backupStateContainer

	key := client.ObjectKey{
		Name:      state.backup.Name,
		Namespace: state.backup.Namespace,
	}

	// 获取对应的 Pod
	actual.pod = &corev1.Pod{}
	if err := r.Get(ctx, key, actual.pod); err != nil {
		if client.IgnoreNotFound(err) != nil {
			return fmt.Errorf("getting pod error: %s", err)
		}
		actual.pod = nil
	}

	// 填充当前真实的状态
	state.actual = &actual
	return nil
}

// setStateDesired 用于设置 backupState 的期望状态(根据 EtcdBackup 对象)
func (r *EtcdBackupReconciler) setStateDesired(state *backupState) error {
	var desired backupStateContainer

	// 创建一个管理的 Pod 用于执行备份操作
	pod, err := podForBackup(state.backup, r.BackupImage)
	if err != nil {
		return fmt.Errorf("computing pod for backup error: %q", err)
	}
	// 配置 controller reference
	if err := controllerutil.SetControllerReference(state.backup, pod, r.Scheme); err != nil {
		return fmt.Errorf("setting pod controller reference error : %s", err)
	}
	desired.pod = pod
	// 获得期望的对象
	state.desired = &desired
	return nil
}

// getState 用来获取当前应用的整个状态,然后才方便判断下一步动作
func (r EtcdBackupReconciler) getState(ctx context.Context, req ctrl.Request) (*backupState, error) {
	var state backupState

	// 获取 EtcdBackup 对象
	state.backup = &etcdv1alpha1.EtcdBackup{}
	if err := r.Get(ctx, req.NamespacedName, state.backup); err != nil {
		if client.IgnoreNotFound(err) != nil {
			return nil, fmt.Errorf("getting backup error: %s", err)
		}
		// 被删除了则直接忽略
		state.backup = nil
		return &state, nil
	}

	// 获取当前备份的真实状态
	if err := r.setStateActual(ctx, &state); err != nil {
		return nil, fmt.Errorf("setting actual state error: %s", err)
	}

	// 获取当前期望的状态
	if err := r.setStateDesired(&state); err != nil {
		return nil, fmt.Errorf("setting desired state error: %s", err)
	}

	return &state, nil
}

// podForBackup 创建一个 Pod 运行备份任务
func podForBackup(backup *etcdv1alpha1.EtcdBackup, image string) (*corev1.Pod, error) {
	// 构造一个全新的备份 Pod
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      backup.Name,
			Namespace: backup.Namespace,
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "backup-agent",
					Image: image,  // todo,执行备份的镜像
					Resources: corev1.ResourceRequirements{
						Requests: corev1.ResourceList{
							corev1.ResourceCPU:    resource.MustParse("100m"),
							corev1.ResourceMemory: resource.MustParse("50Mi"),
						},
						Limits: corev1.ResourceList{
							corev1.ResourceCPU:    resource.MustParse("100m"),
							corev1.ResourceMemory: resource.MustParse("50Mi"),
						},
					},
				},
			},
			RestartPolicy: corev1.RestartPolicyNever,
		},
	}, nil
}

当我们获取到 backupState 对象后,需要根据对象的状态或者里面包含的执行任务的 Pod 状态来决定下一步的动作,由于执行动作有多个,所以我们可以定义一个接口来接收不同的动作。在 controllers 包下面新建 action.go 文件,文件内容如下所示:

// controllers/action.go

package controllers

import (
	"context"
	"fmt"
	"reflect"

	"k8s.io/apimachinery/pkg/runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
)

// 定义的执行动作接口
type Action interface {
	Execute(context.Context) error
}

// PatchStatus 用户更新对象 status 状态
type PatchStatus struct {
	client   client.Client
	original runtime.Object
	new      runtime.Object
}

func (o *PatchStatus) Execute(ctx context.Context) error {
	if reflect.DeepEqual(o.original, o.new) {
		return nil
	}
	// 更新状态
	if err := o.client.Status().Patch(ctx, o.new, client.MergeFrom(o.original)); err != nil {
		return fmt.Errorf("while patching status error %q", err)
	}

	return nil
}

// CreateObject 创建一个新的资源对象
type CreateObject struct {
	client client.Client
	obj    runtime.Object
}

func (o *CreateObject) Execute(ctx context.Context) error {
	if err := o.client.Create(ctx, o.obj); err != nil {
		return fmt.Errorf("error %q while creating object ", err)
	}
	return nil
}

我们在 Reconcile 函数中主要要执行的动作就是更新备份对象的状态或者创建一个备份的 Pod,接下来看看完整的 Reconcile 函数实现:

// controllers/etcdbackup_controller.go

// +kubebuilder:rbac:groups=etcd.ydzs.io,resources=etcdbackups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.ydzs.io,resources=etcdbackups/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create

func (r *EtcdBackupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	ctx := context.Background()
	log := r.Log.WithValues("etcdbackup", req.NamespacedName)

	// get backup state
	state, err := r.getState(ctx, req)
	if err != nil {
		return ctrl.Result{}, err
	}

	// 根据状态来判断下一步要执行的动作
	var action Action

	switch {
	case state.backup == nil: // 被删除了
		log.Info("Backup Object not found. Ignoring.")
	case !state.backup.DeletionTimestamp.IsZero(): // 标记为了删除
		log.Info("Backup Object has been deleted. Ignoring.")
	case state.backup.Status.Phase == "": // 开始备份,更新状态
		log.Info("Backup Staring. Updating status.")
		newBackup := state.backup.DeepCopy()                                            // 深拷贝一份
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseBackingUp                  // 更新状态为备份中
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup} // 下一步要执行的动作
	case state.backup.Status.Phase == etcdv1alpha1.EtcdBackupPhaseFailed: // 备份失败
		log.Info("Backup has failed. Ignoring.")
	case state.backup.Status.Phase == etcdv1alpha1.EtcdBackupPhaseCompleted: // 备份完成
		log.Info("Backup has completed. Ignoring.")
	case state.actual.pod == nil: // 当前还没有备份的 Pod
		log.Info("Backup Pod does not exists. Creating.")
		action = &CreateObject{client: r.Client, obj: state.desired.pod} // 下一步要执行的动作
	case state.actual.pod.Status.Phase == corev1.PodFailed: // 备份Pod执行失败
		log.Info("Backup Pod failed. Updating status.")
		newBackup := state.backup.DeepCopy()
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseFailed
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup} // 下一步更新状态为失败
	case state.actual.pod.Status.Phase == corev1.PodSucceeded: // 备份Pod执行完成
		log.Info("Backup Pod succeeded. Updating status.")
		newBackup := state.backup.DeepCopy()
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseCompleted
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup} // 下一步更新状态为完成
	}

	// 执行动作
	if action != nil {
		if err := action.Execute(ctx); err != nil {
			return ctrl.Result{}, fmt.Errorf("executing action error: %s", err)
		}
	}

	return ctrl.Result{}, nil
}

到这里我们就基本上完成了备份的 CRD 的调谐过程开发,当然还有最最核心的是备份的实际逻辑还没实现,这个我们只需要单独写一个命令行工具,然后打包成镜像即可。

此外还有一个地方需要注意,上面我们使用了 client.Status() 来更新 EtcdBackup 的状态,直接这样更新会触发 panic,提示 could not find the requested resource,要解决这个问题需要我们将 Status 设置为 EtcdBackup 的 subResource,在 EtcdBackup 的结构体上方增加一行 // +kubebuilder:subresource:status 这样的注释信息:

// api/v1alpha1/etcdbackup_types.go

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// EtcdBackup is the Schema for the etcdbackups API
type EtcdBackup struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   EtcdBackupSpec   `json:"spec,omitempty"`
	Status EtcdBackupStatus `json:"status,omitempty"`
}

更改完成后,记得执行 make install 命令重新安装 CRD,这样就可以正常去更新 Status 状态了。

实现备份操作

上面我们通过运行一个 Pod 来执行备份任务,那么自然真正实现备份的就是这个 Pod 里面包含的镜像了,所以备份的操作逻辑需要我们单独实现,然后打包成一个独立的镜像替换掉上面创建的 Pod 镜像即可。

在项目根目录下面创建一个新的文件 cmd/backup/main.go 用于实现备份 etcd 集群功能,代码如下所示:

// cmd/backup/main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/cnych/etcd-operator/pkg/file"
	"github.com/go-logr/logr"
	"github.com/go-logr/zapr"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/clientv3/snapshot"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func loggedError(log logr.Logger, err error, message string) error {
	log.Error(err, message)
	return fmt.Errorf("%s: %s", message, err)
}

func main() {
	var (
		backupTempDir          string
		etcdURL                string
		etcdDialTimeoutSeconds int64
		timeoutSeconds         int64
	)

	flag.StringVar(&backupTempDir, "backup-tmp-dir", os.TempDir(), 
	"The directory to temporarily place backups before they are uploaded to their destination.")
	flag.StringVar(&etcdURL, "etcd-url", "<http://localhost:2379>", "URL for etcd.")
	flag.Int64Var(&etcdDialTimeoutSeconds, "etcd-dial-timeout-seconds", 5, "Timeout, in seconds, for dialing the Etcd API.")
	flag.Int64Var(&timeoutSeconds, "timeout-seconds", 60, "Timeout, in seconds, of the whole restore operation.")
	flag.Parse()

	zapLogger := zap.NewRaw(zap.UseDevMode(true))
	ctrl.SetLogger(zapr.NewLogger(zapLogger))

	log := ctrl.Log.WithName("backup-agent")
	ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeoutSeconds))
	defer ctxCancel()

	log.Info("Connecting to Etcd and getting snapshot")
	localPath := filepath.Join(backupTempDir, "snapshot.db")
	etcdClient := snapshot.NewV3(zapLogger.Named("etcd-client"))
	err := etcdClient.Save(
		ctx,
		clientv3.Config{
			Endpoints:   []string{etcdURL},
			DialTimeout: time.Second * time.Duration(etcdDialTimeoutSeconds),
		},
		localPath,
	)
	if err != nil {
		panic(loggedError(log, err, "failed to get etcd snapshot"))
	}
	
  // 临时测试
	endpoint := "play.min.io"
	accessKeyID := "Q3AM3UQ867SPQQA43P2F"
	secretAccessKey := "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
	s3Uploader := uploader.NewS3Uploader(endpoint, accessKeyID, secretAccessKey)

	log.Info("Uploading snapshot")
	size, err := s3Uploader.Upload(ctx, localPath)
	if err != nil {
		panic(loggedError(log, err, "failed to upload backup"))
	}
	log.WithValues("upload-size", size).Info("Backup complete")
}

这里我们暂时只是测试下备份功能,直接使用 minio 的 play.min.io 环境,所以将 endpoint、acessKey、secretKey 这些暂时硬编码。通过 --etcd-url 参数传递的备份 Etcd 集群的地址,将 snapshot 数据保存到临时目录中,然后通过 uploader 进行上传,这里正常应该通过我们的 EtcdBackup 传递的 CR 属性来判断实例化 S3 还是 OSS,暂时我们先只使用 S3,后面再优化代码即可。

新建 pkg/file/s3.go 文件,在文件中实现 minio 上传功能,直接使用 minio-go 官方的 SDK 即可:

package file

import (
	"context"

	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
)

type s3Uploader struct {
	Endpoint        string
	AccessKeyID     string
	SecretAccessKey string
}

func NewS3Uploader(Endpoint, AK, SK string) *s3Uploader {
	return &s3Uploader{
		Endpoint:        Endpoint,
		AccessKeyID:     AK,
		SecretAccessKey: SK,
	}
}

// 初使化 minio client 对象
func (su *s3Uploader) InitClient() (*minio.Client, error) {
	return minio.New(su.Endpoint, &minio.Options{
		Creds:  credentials.NewStaticV4(su.AccessKeyID, su.SecretAccessKey, ""),
		Secure: true,
	})
}

func (su *s3Uploader) Upload(ctx context.Context, filePath string) (int64, error) {
	client, err := su.InitClient()
	if err != nil {
		return 0, err
	}
	bucketName := "testback"  // todo 
	objectName := "etcd-snapshot.db"  // todo
	uploadInfo, err := client.FPutObject(ctx, bucketName, objectName, filePath, minio.PutObjectOptions{})
	if err != nil {
		return 0, err
	}
	return uploadInfo.Size, nil
}

然后将上面的备份任务打包成一个新的 Docker 镜像,修改项目根目录下面的 Dockerfile 文件,如下所示:

# Build the manager binary
FROM golang:1.13 as builder

RUN apt-get -y update && apt-get -y install upx

WORKDIR /workspace
# Copy the Go Modules manifests
COPY go.mod go.mod
COPY go.sum go.sum

# Copy the go source
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY cmd/ cmd/
COPY pkg/ pkg/

ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64
ENV GO111MODULE=on
ENV GOPROXY="<https://goproxy.cn>"

# Build
RUN go mod download && \\
		go build -mod=readonly -o manager main.go && \\
		go build -mod=readonly -o backup cmd/backup/main.go && \\
		upx manager backup

# Use distroless as minimal base image to package the manager binary
# Refer to <https://github.com/GoogleContainerTools/distroless> for more details
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /workspace/manager .
USER nonroot:nonroot
ENTRYPOINT ["/manager"]

FROM gcr.io/distroless/static:nonroot as backup
WORKDIR /
COPY --from=builder /workspace/backup .
USER nonroot:nonroot
ENTRYPOINT ["/backup"]

这里我们利用 Docker 的多阶段构建来分别构建 Operator 以及备份的镜像,执行下面的命令即可构建备份的镜像:

$ docker build --target backup -t cnych/etcd-operator-backup:v0.0.4 -f Dockerfile .
$ docker push cnych/etcd-operator-backup:v0.0.4

然后记得修改 EtcdBackup 控制器生成的 Pod 结构:

// controllers/etcdbackup_controller.go

func podForBackup(backup *etcdv1alpha1.EtcdBackup, image string) *corev1.Pod {
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      backup.Name,
			Namespace: backup.Namespace,
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "etcd-backup",
					Image: image, // todo
					Args: []string{
						"--etcd-url", backup.Spec.EtcdUrl,  
					},
	......

项目根目录下面的 main.go 文件修改默认备份镜像地址:

// main.go

var (
	scheme             = runtime.NewScheme()
	setupLog           = ctrl.Log.WithName("setup")
	defaultBackupImage = "cnych/etcd-operator-backup:v0.0.4"  // 修改默认备份镜像地址
)
......

测试备份

功能实现完成后,接下来来测试下备份功能,执行下面的命令启动控制器:

$ make install
$ make run

启动完成后,创建备份的示例 CR 资源:

$ kubectl apply -f config/samples/etcd_v1alpha1_etcdbackup.yaml
$ kubectl get etcdbackup
NAME                AGE
etcdbackup-sample   18h
$ kubectl get pods
NAME                READY   STATUS      RESTARTS   AGE
etcd-demo-0         1/1     Running     0          6d21h
etcd-demo-1         1/1     Running     0          6d21h
etcd-demo-2         1/1     Running     0          6d21h
etcdbackup-sample   0/1     Completed   0          18h

查看备份的 Pod 日志也可以看出来备份成功了,如下图所示:

https://bxdc-static.oss-cn-beijing.aliyuncs.com/images/20201208145146.png

基本流程跑通了,接下来我们只需要去优化代码增加不同的备份类型即可。

代码重构

上面我们只是实现了备份的功能,接下来我们就来完成剩余的工作,比如支持 S3 或者 OSS 的相关配置,首先我们使用一个 Secret 对象来创建 MINIO 的相关配置:

$ kubectl create secret generic minio-access-secret --from-literal=MINIO_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F 
--from-literal=MINIO_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG

然后把这个 minio-access-secret 对象传给 EtcdBackup 对象,比如我们先来配置 S3,对应的 CR 测试资源如下所示:

apiVersion: etcd.ydzs.io/v1alpha1
kind: EtcdBackup
metadata:
  name: etcdbackup-sample
spec:
  etcdUrl: "<http://etcd-demo-0.etcd-demo:2379>"  # 备份的节点 etcd 地址
  storageType: s3  # 指定备份类型
  s3:
    path: "ydzsio/etcd-snapshot.db"  # 数据存放目录,格式:<bucket-name>/<object-path>
    endpoint: "play.min.io"  # MINIO 地址
    secret: "minio-access-secret"   # 包含 accessKeyID 与 secretAccessKey
#  oss:
#    path: "foo-bucket/snapshot.db"
#    endpiont: "oss.aliyuncs.xxx.com"
#    secret: "secret"

#kubectl create secret generic minio-access-secret --from-literal=MINIO_ACCESS_KEY=Q3AM3UQ867SPQQA43P2F 
--from-literal=MINIO_SECRET_KEY=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG

当然我们需要重新更新下 EtcdBack 的结构体:

// api/v1alpha1/etcdbackup_types.go

var (
	BackupStorageTypeS3  BackupStorageType = "s3"
	BackupStorageTypeOSS BackupStorageType = "oss"
)

type BackupStorageType string

type S3BackupSource struct {
	Path     string `json:"path"`
	Endpoint string `json:"endpoint"`
	// Secret Object: AccessKey AcessSecryt
	Secret string `json:"secret"`
}

修改完成后,记得重新执行 make 命令。

接下来我们需要重新来补充完成控制器中构造 Pod 的函数

// controllers/etcdbackup_controller.go

func podForBackup(backup *etcdv1alpha1.EtcdBackup, image string) *corev1.Pod {
	var secretRef *corev1.SecretEnvSource
	var backupURL, backupEndpoint string
	if backup.Spec.StorageType == etcdv1alpha1.BackupStorageTypeS3 {
		backupURL = fmt.Sprintf("%s://%s", backup.Spec.StorageType, backup.Spec.S3.Path)
		backupEndpoint = backup.Spec.S3.Endpoint
		secretRef = &corev1.SecretEnvSource{
			LocalObjectReference: corev1.LocalObjectReference{
				Name: backup.Spec.S3.Secret,
			},
		}
	} else {
		backupURL = fmt.Sprintf("%s://%s", backup.Spec.StorageType, backup.Spec.OSS.Path)
		backupEndpoint = backup.Spec.OSS.Endpoint
		secretRef = &corev1.SecretEnvSource{
			LocalObjectReference: corev1.LocalObjectReference{
				Name: backup.Spec.OSS.Secret,
			},
		}
	}
	return &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name:      backup.Name,
			Namespace: backup.Namespace,
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "etcd-backup",
					Image: image,
					Args: []string{
						"--etcd-url", backup.Spec.EtcdUrl,
						"--backup-url", backupURL,
					},
					Env: []corev1.EnvVar{
						{
							Name:  "ENDPOINT",
							Value: backupEndpoint,
						},
					},
					EnvFrom: []corev1.EnvFromSource{
						{
							SecretRef: secretRef,
						},
					},
					Resources: corev1.ResourceRequirements{
						Requests: corev1.ResourceList{
							corev1.ResourceCPU:    resource.MustParse("100m"),
							corev1.ResourceMemory: resource.MustParse("100Mi"),
						},
						Limits: corev1.ResourceList{
							corev1.ResourceCPU:    resource.MustParse("100m"),
							corev1.ResourceMemory: resource.MustParse("100Mi"),
						},
					},
				},
			},
			RestartPolicy: corev1.RestartPolicyNever,
		},
	}
}

这里可以看到我们新传递了一个 --backup-url 的参数,格式为 s3://my-bucket/my-dir/my-obj.db ,这样我们就可以根据这个地址的协议来判断将 etcd 备份数据上传到什么地方,后面就是对应的 bucket 名称和对象名称,这里我们可以用下面的函数来解析得到:

// pkg/file/utils.go

package file

import (
	"net/url"
)

// 解析 backupUrl(格式:s3://my-bucket/my-dir/my-obj.db)
// 得到 类型、bucket、对象名称
func ParseBackupURL(backupUrl string) (string, string, string, error) {
	u, err := url.Parse(backupUrl)
	if err != nil {
		return "", "", "", err
	}
	return u.Scheme, u.Host, u.Path[1:], nil
}

然后通过环境变量的形式讲对象存储的 Endpoint 以及访问的私钥通过 Secret 注入到 Pod 去了,接下来我们只需要在备份的实现中去读取这些环境变量和参数即可,完整代码如下所示:

// cmd/backup/main.go

package main

import (
	"context"
	"flag"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/cnych/etcd-operator/api/v1alpha1"
	"github.com/cnych/etcd-operator/pkg/file"
	"github.com/go-logr/logr"
	"github.com/go-logr/zapr"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/clientv3/snapshot"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func logErr(log logr.Logger, err error, message string) error {
	log.Error(err, message)
	return fmt.Errorf("%s: %s", message, err)
}

func main() {

	var (
		backupTempDir      string
		etcdURL            string
		dialTimeoutSeconds int64
		timeoutSeconds     int64
		backupURL          string
	)

	flag.StringVar(&backupTempDir, "backup-tmp-dir", os.TempDir(), "The directory to temp place backup etcd cluster.")
	flag.StringVar(&etcdURL, "etcd-url", "", "URL for backup etcd.")
	flag.StringVar(&backupURL, "backup-url", "", "URL for the backup storage.")
	flag.Int64Var(&dialTimeoutSeconds, "dial-timeout-seconds", 5, "Timeout for dialing the Etcd.")
	flag.Int64Var(&timeoutSeconds, "timeout-seconds", 60, "Timeout for Backup the Etcd.")
	flag.Parse() // 一定要加上

	zapLogger := zap.NewRaw(zap.UseDevMode(true))
	ctrl.SetLogger(zapr.NewLogger(zapLogger))

	ctx, ctxCancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeoutSeconds))
	defer ctxCancel()

	log := ctrl.Log.WithName("backup")

	// 解析备份上传对象存储参数
	storageType, bucketName, objectName, err := file.ParseBackupURL(backupURL)
	if err != nil {
		panic(logErr(log, err, "failed to parse etcd backup url"))
	}

	log.Info("Connecting to Etcd and getting Snapshot data")
	// 定义一个本地的数据目录
	localPath := filepath.Join(backupTempDir, "snapshot.db")
	// 创建etcd snapshot manager
	etcdManager := snapshot.NewV3(zapLogger)
	// 保存etcd snapshot数据到localPath
	err = etcdManager.Save(ctx, clientv3.Config{
		Endpoints:   []string{etcdURL},
		DialTimeout: time.Second * time.Duration(dialTimeoutSeconds),
	}, localPath)
	if err != nil {
		panic(logErr(log, err, "failed to get etcd snapshot data"))
	}

	log.Info("Uploading snapshot...")
	switch storageType {
	case string(v1alpha1.BackupStorageTypeS3): // s3
		// 数据保存到本地成功
		// 上传到S3
		size, err := handleS3(ctx, localPath, bucketName, objectName)
		if err != nil {
			panic(logErr(log, err, "failed to upload backup etcd"))
		}
		log.WithValues("upload-size", size).Info("Backup completed")
	case string(v1alpha1.BackupStorageTypeOSS): // oss
	default:
		panic(logErr(log, fmt.Errorf("storage type error"), fmt.Sprintf("unknown StorageType: %v", storageType)))
	}

}

func handleS3(ctx context.Context, localPath, bucket, objectName string) (int64, error) {
	// 根据传递进来的参数(环境变量)获取s3配置信息
	endpoint := os.Getenv("ENDPOINT")
	accessKeyID := os.Getenv("MINIO_ACCESS_KEY")
	secretAccessKey := os.Getenv("MINIO_SECRET_KEY")
	//endpoint := "play.min.io"
	//accessKeyID := "Q3AM3UQ867SPQQA43P2F"
	//secretAccessKey := "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG"
	//	bucketName := "ydzsio"
	//	objectName := "etcd-snapshot.db"
	s3Uploader := file.NewS3Uploader(endpoint, accessKeyID, secretAccessKey)
	return s3Uploader.Upload(ctx, localPath, bucket, objectName)
}

这里我们解析 --backup-url 参数,得到存储类型,根据存储类型来决定应该将数据上传到什么地方,然后分别进行处理,目前我们暂时只对 S3 进行了支持,更多的存储可以根据需求自行扩展即可。

最后同样重新构建镜像,重新测试即可,到这里我们就基本上完成了 Etcd 备份 Operator 的开发工作了,后续就可以根据我们的实际使用进行优化即可。

事件记录

现在我们可以正常的使用 EtcdBackup 这个 CRD 资源对象来进行 Etcd 的备份了,但是当我们 describe 查看这个对象的时候 Events 信息还为空,我们知道 Events 信息是我们排错非常重要的信息,所以在开发 Operator 的时候给资源对象添加上合理的 Events 信息是非常重要的。

首先在 EtcdBackupReconciler 结构体中新增一个 record.EventRecorder 属性,用于事件记录:

// controllers/etcdbackup_controller.go

import "k8s.io/client-go/tools/record"

// EtcdBackupReconciler reconciles a EtcdBackup object
type EtcdBackupReconciler struct {
	client.Client
	Log         logr.Logger
	Scheme      *runtime.Scheme
	Recorder    record.EventRecorder
	BackupImage string
}

接下来我们就需要在调谐函数 Reconcile 中新增事件记录信息:

// controllers/etcdbackup_controller.go

// +kubebuilder:rbac:groups=etcd.ydzs.io,resources=etcdbackups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=etcd.ydzs.io,resources=etcdbackups/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/status,verbs=get;update;patch

func (r *EtcdBackupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
	ctx := context.Background()
	log := r.Log.WithValues("etcdbackup", req.NamespacedName)

	// 获取backupState
	state, err := r.getState(ctx, req)
	if err != nil {
		return ctrl.Result{}, err
	}

	// 根据状态来判断下一步要执行的动作
	var action Action

	// 开始判断状态
	switch {
	case state.backup == nil: // 被删除了
		log.Info("Backup Object not found")
	case !state.backup.DeletionTimestamp.IsZero(): // 被标记为删除了
		log.Info("Backup Object has been deleted")
	case state.backup.Status.Phase == "": // 要开始备份了,先标记状态为备份中
		log.Info("Backup starting...")
		newBackup := state.backup.DeepCopy()
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseBackingUp // 更改成备份中...
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup}
	case state.backup.Status.Phase == etcdv1alpha1.EtcdBackupPhaseFailed: // 失败了
		log.Info("Backup has failed. Ignoring...")
	case state.backup.Status.Phase == etcdv1alpha1.EtcdBackupPhaseCompleted: // 完成了
		log.Info("Backup has completed. Ignoring...")
	case state.actual.pod == nil: // 当前还没有执行任务的Pod
		log.Info("Backup Pod does not exists. Creating...")
		action = &CreateObject{client: r.Client, obj: state.desired.pod}
		r.Recorder.Event(state.backup, corev1.EventTypeNormal, "SuccessfulCreate", fmt.Sprintf("Created Pod: %s", state.desired.pod.Name))
	case state.actual.pod.Status.Phase == corev1.PodFailed: // Pod执行失败
		log.Info("Backup Pod failed.")
		newBackup := state.backup.DeepCopy()
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseFailed // 更改成备份失败
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup}
		r.Recorder.Event(state.backup, corev1.EventTypeWarning, "BackupFailed", "Backup failed. See backup pod logs for details.")
	case state.actual.pod.Status.Phase == corev1.PodSucceeded: // Pod 执行成功
		log.Info("Backup Pod success.")
		newBackup := state.backup.DeepCopy()
		newBackup.Status.Phase = etcdv1alpha1.EtcdBackupPhaseCompleted // 更改成备份成功
		action = &PatchStatus{client: r.Client, original: state.backup, new: newBackup}
		r.Recorder.Event(state.backup, corev1.EventTypeNormal, "BackupSucceeded", "Backup completed successfully")
	}

	if action != nil {
		if err := action.Execute(ctx); err != nil {
			return ctrl.Result{}, err
		}
	}

	return ctrl.Result{}, nil
}

在创建 Pod、Pod 失败和执行成功的场景下我们都使用 Recorder 进行了事件信息记录。当然我们在 EtcdBackupReconciler 结构体中新增了 Recorder 属性,那么自然需要在 main.go 中进行初始化:

// main.go

func main() {
  ......
	if err = (&controllers.EtcdBackupReconciler{
		Client:      mgr.GetClient(),
		Log:         ctrl.Log.WithName("controllers").WithName("EtcdBackup"),
		Scheme:      mgr.GetScheme(),
		Recorder:    mgr.GetEventRecorderFor("etcdbackup-controller"),  // 初始化 Recorder
		BackupImage: backupImage,
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "EtcdBackup")
		os.Exit(1)
	}
  ......
}

备份目录模板化

目前我们在备份 Etcd 的时候通过 EtcdBackup 这个 CRD 来指定对象存储中的备份目录,这个目录是硬编码的,这样就不够灵活,我们可以通过将其模板化来增加灵活性,比如我们可以使用 EtcdBackup 中的某些属性字段来拼接目录,比如 my-bucket/{{ .Namespace }}/{{ .Name }}/{{ .CreationTimestamp }}/snapshot.db 或者 my-bucket/snapshot-{{ .UID }}.db 这样的路径,显然就比硬编码要灵活许多了,要支持备份路径的模板化也非常简单,只需要在控制器中构造 Pod 的时候传入 backupURL 参数的时候通过 go-template 将其模板解析替换即可:

// 

func podForBackup(backup *etcdv1alpha1.EtcdBackup, image string) (*corev1.Pod, error) {
	// 构造一个全新的备份 Pod
	var secretRef *corev1.SecretEnvSource
	var backupEndpoint, backupURL string
	// TODO,validate yaml
	if backup.Spec.StorageType == etcdv1alpha1.BackupStorageTypeS3 {
		backupEndpoint = backup.Spec.S3.Endpoint
		// format:
				// s3://my-bucket/my-dir/my-object.db
				// s3://my-bucket/{{ .Namespace }}/{{ .Name }}/{{ .CreationTimestamp }}/snapshot.db
				// s3://my-bucket/snapshot-{{ .UID }}.db
		// 备份的目的地址支持 go-template
		tmpl, err := template.New("template").Parse(backup.Spec.S3.Path)
		if err != nil {
			return nil, fmt.Errorf("error %q parsing object URL template", err)
		}
		// 解析成备份地址
		var objectURL strings.Builder
		if err := tmpl.Execute(&objectURL, backup); err != nil {
			return nil, fmt.Errorf("error %q executing template", err)
		}
		backupURL = fmt.Sprintf("%s://%s", backup.Spec.StorageType, objectURL)
		secretRef = &corev1.SecretEnvSource{
			LocalObjectReference: corev1.LocalObjectReference{
				Name: backup.Spec.S3.Secret,
			},
		}
	} else { // oss
		//backupURL = ?
		//backupEndpoint = ?
		secretRef = &corev1.SecretEnvSource{
			LocalObjectReference: corev1.LocalObjectReference{
				Name: backup.Spec.OSS.Secret,
			},
		}
	}
	......
}

这样解析过后我们的备份地址就可以使用 EtcdBackup CRD 的属性来进行模板化了,大大提高了我们的灵活性,当然最后我们还需要进行测试来验证代码是否有效。

未经允许不得转载:江哥架构师笔记 » operator:备份 etcd operator 开发

分享到:更多 ()