介绍
Clientset 是调用 Kubernetes 资源对象最常用的客户端,可以操作所有的资源对象。
前面我们说了在 staging/src/k8s.io/api
下面定义了各种类型资源的规范,然后将这些规范注册到了全局的 Scheme 中,这样就可以在 Clientset
中使用这些资源了。那么我们应该如何使用 Clientset 呢?
示例
首先我们来看下如何通过 Clientset 来获取资源对象,我们这里来创建一个 Clientset 对象,然后通过该对象来获取默认命名空间之下的 Deployments 列表,代码如下所示:
package main import ( "flag" "fmt" "os" "path/filepath" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) func main() { var err error var config *rest.Config var kubeconfig *string if home := homeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") } flag.Parse() // 使用 ServiceAccount 创建集群配置(InCluster模式) if config, err = rest.InClusterConfig(); err != nil { // 使用 KubeConfig 文件创建集群配置 if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 使用 clientsent 获取 Deployments deployments, err := clientset.AppsV1().Deployments("default").List(metav1.ListOptions{}) if err != nil { panic(err) } for idx, deploy := range deployments.Items { fmt.Printf("%d -> %s\\n", idx+1, deploy.Name) } } func homeDir() string { if h := os.Getenv("HOME"); h != "" { return h } return os.Getenv("USERPROFILE") // windows }
上面的代码运行可以获得 default 命名空间之下的 Pods:
$ go run main.go 1 -> details-v1 2 -> el-gitlab-listener 3 -> nginx 4 -> productpage-v1 5 -> ratings-v1 6 -> reviews-v1 7 -> reviews-v2 8 -> reviews-v3
这是一个非常典型的访问 Kubernetes 集群资源的方式,通过 client-go 提供的 Clientset 对象来获取资源数据,主要有以下三个步骤:
-
使用 kubeconfig 文件或者 ServiceAccount(InCluster 模式)来创建访问 Kubernetes API 的 Restful 配置参数,也就是代码中的
rest.Config
对象 -
使用 rest.Config 参数创建 Clientset 对象,这一步非常简单,直接调用
kubernetes.NewForConfig(config)
即可初始化 -
然后是 Clientset 对象的方法去获取各个 Group 下面的对应资源对象进行 CRUD 操作
Clientset 对象
上面我们了解了如何使用 Clientset 对象来获取集群资源,接下来我们来分析下 Clientset 对象的实现。
上面我们使用的 Clientset 实际上是对各种资源类型的 Clientset 的一次封装:
// staging/src/k8s.io/client-go/kubernetes/clientset.go // NewForConfig 使用给定的 config 创建一个新的 Clientset func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset var err error cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 将其他 Group 和版本的资源的 RESTClient 封装到全局的 Clientset 对象中 cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } ...... cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { return nil, err } return &cs, nil }
上面的 NewForConfig 函数里面就是将其他的各种资源的 RESTClient 封装到了全局的 Clientset 中,这样当我们需要访问某个资源的时候只需要使用 Clientset 里面包装的属性即可,比如 clientset.CoreV1()
就是访问 Core 这个 Group 下面 v1 这个版本的 RESTClient。这些局部的 RESTClient 都定义在 staging/src/k8s.io/client-go/typed/<group>/<version>/<plug>_client.go
文件中,比如 staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go
这个文件中就是定义的 apps 这个 Group 下面的 v1 版本的 RESTClient,这里同样以 Deployment 为例:
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go // NewForConfig 根据 rest.Config 创建一个 AppsV1Client func NewForConfig(c *rest.Config) (*AppsV1Client, error) { config := *c // 为 rest.Config 设置资源对象默认的参数 if err := setConfigDefaults(&config); err != nil { return nil, err } // 实例化 AppsV1Client 的 RestClient client, err := rest.RESTClientFor(&config) if err != nil { return nil, err } return &AppsV1Client{client}, nil } func setConfigDefaults(config *rest.Config) error { // 资源对象的 GroupVersion gv := v1.SchemeGroupVersion config.GroupVersion = &gv // 资源对象的 root path config.APIPath = "/apis" // 使用注册的资源类型 Scheme 对请求和响应进行编解码,Scheme 就是前文中分析的资源类型的规范 config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} if config.UserAgent == "" { config.UserAgent = rest.DefaultKubernetesUserAgent() } return nil } func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface { return newDeployments(c, namespace) } // staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go // deployments 实现了 DeploymentInterface 接口 type deployments struct { client rest.Interface ns string } // newDeployments 实例化 deployments 对象 func newDeployments(c *AppsV1Client, namespace string) *deployments { return &deployments{ client: c.RESTClient(), ns: namespace, } }
通过上面代码我们就可以很清晰的知道可以通过 clientset.AppsV1().Deployments("default")
来获取一个 deployments 对象,然后该对象下面定义了 deployments 对象的 CRUD 操作,比如我们调用的 List()
函数:
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go func (c *deployments) List(opts metav1.ListOptions) (result *v1.DeploymentList, err error) { var timeout time.Duration if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second } result = &v1.DeploymentList{} err = c.client.Get(). Namespace(c.ns). Resource("deployments"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(). Into(result) return }
从上面代码可以看出最终是通过 c.client 去发起的请求,也就是局部的 restClient 初始化的函数中通过 rest.RESTClientFor(&config)
创建的对象,也就是将 rest.Config
对象转换成一个 Restful 的 Client 对象用于网络操作:
// staging/src/k8s.io/client-go/rest/config.go // RESTClientFor 返回一个满足客户端 Config 对象上的属性的 RESTClient 对象。 // 注意在初始化客户端的时候,RESTClient 可能需要一些可选的属性。 func RESTClientFor(config *Config) (*RESTClient, error) { if config.GroupVersion == nil { return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient") } if config.NegotiatedSerializer == nil { return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient") } qps := config.QPS if config.QPS == 0.0 { qps = DefaultQPS } burst := config.Burst if config.Burst == 0 { burst = DefaultBurst } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) if err != nil { return nil, err } transport, err := TransportFor(config) if err != nil { return nil, err } // 初始化一个 HTTP Client 对象 var httpClient *http.Client if transport != http.DefaultTransport { httpClient = &http.Client{Transport: transport} if config.Timeout > 0 { httpClient.Timeout = config.Timeout } } return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient) }
到这里我们就知道了 Clientset 是基于 RESTClient 的,RESTClient 是底层的用于网络请求的对象,可以直接通过 RESTClient 提供的 RESTful 方法如 Get()、Put()、Post()、Delete() 等和 APIServer 进行交互
-
同时支持 JSON 和 protobuf 两种序列化方式
-
支持所有原生资源
当 初始化 RESTClient 过后就可以发起网络请求了,比如对于 Deployments 的 List 操作:
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) { var timeout time.Duration if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second } result = &v1.DeploymentList{} // 调用 RestClient 发起真正的网络请求 err = c.client.Get(). Namespace(c.ns). Resource("deployments"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) return }
上面通过调用 RestClient 发起网络请求,真正发起网络请求的代码如下所示:
// staging/src/k8s.io/client-go/rest/request.go // request connects to the server and invokes the provided function when a server response is // received. It handles retry behavior and up front validation of requests. It will invoke // fn at most once. It will return an error if a problem occurred prior to connecting to the // server - the provided function is responsible for handling server errors. func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error { //Metrics for total request latency start := time.Now() defer func() { metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start)) }() if r.err != nil { klog.V(4).Infof("Error in request: %v", r.err) return r.err } if err := r.requestPreflightCheck(); err != nil { return err } // 初始化网络客户端 client := r.c.Client if client == nil { client = http.DefaultClient } // Throttle the first try before setting up the timeout configured on the // client. We don't want a throttled client to return timeouts to callers // before it makes a single request. if err := r.tryThrottle(ctx); err != nil { return err } // 超时处理 if r.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, r.timeout) defer cancel() } // 重试机制,最多重试10次 maxRetries := 10 retries := 0 for { url := r.URL().String() // 构造请求对象 req, err := http.NewRequest(r.verb, url, r.body) if err != nil { return err } req = req.WithContext(ctx) req.Header = r.headers r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) if retries > 0 { // We are retrying the request that we already send to apiserver // at least once before. // This request should also be throttled with the client-internal rate limiter. if err := r.tryThrottle(ctx); err != nil { return err } } // 发起网络调用 resp, err := client.Do(req) updateURLMetrics(r, resp, err) if err != nil { r.backoff.UpdateBackoff(r.URL(), err, 0) } else { r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode) } if err != nil { // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors. // Thus in case of "GET" operations, we simply retry it. // We are not automatically retrying "write" operations, as // they are not idempotent. if r.verb != "GET" { return err } // For connection errors and apiserver shutdown errors retry. if net.IsConnectionReset(err) || net.IsProbableEOF(err) { // For the purpose of retry, we set the artificial "retry-after" response. // TODO: Should we clean the original response if it exists? resp = &http.Response{ StatusCode: http.StatusInternalServerError, Header: http.Header{"Retry-After": []string{"1"}}, Body: ioutil.NopCloser(bytes.NewReader([]byte{})), } } else { return err } } done := func() bool { // Ensure the response body is fully read and closed // before we reconnect, so that we reuse the same TCP // connection. defer func() { const maxBodySlurpSize = 2 << 10 if resp.ContentLength <= maxBodySlurpSize { io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize}) } resp.Body.Close() }() retries++ if seconds, wait := checkWait(resp); wait && retries < maxRetries { if seeker, ok := r.body.(io.Seeker); ok && r.body != nil { _, err := seeker.Seek(0, 0) if err != nil { klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body) fn(req, resp) return true } } klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url) r.backoff.Sleep(time.Duration(seconds) * time.Second) return false } // 将req和resp回调 fn(req, resp) return true }() if done { return nil } } }
到这里就完成了一次完整的网络请求。
其实 Clientset 对象也就是将 rest.Config 封装成了一个 http.Client
对象而已,最终还是利用 golang 中的 http 库来执行一个正常的网络请求而已。
评论前必须登录!
注册