当前位置:   article > 正文

client-go之clientset原理(四)_clientset.corev1().events("default").list

clientset.corev1().events("default").list

1.介绍

Clientset是调用kubernetes资源对象最常用的客户端,可以操作所有的资源对象。
前面我们说了在staging/src/k8s.io/api下面定义了各种资源类型的规范,然后将这些规范注册到了全局的Scheme中,这样就可以在Clientset中使用这些资源了。那么我们应该如何使用Clientset呢?

2.示例

首先我们来看下如何通过clientset来获取资源对象,我们这里来创建一个Clientset对象,然后通过该对象来获取默认命名空间之下的Deployments列表,代码如下所示:

package main

import (
	"context"
	"flag"
	"fmt"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"path/filepath"
)

// 获取defaults命名空间下所有deployment列表
func main() {
	var err error
	var config *rest.Config
	var kubeconfig *string
	ctx := context.Background()

	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "absolute path to kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "","absolute path to kubeconfig file")
	}

	flag.Parse()

	// 使用sa创建集群配置(InCluster模式),需要去配置对应的RBAC权限,默认的是default->无权限获取deployments的List权限
	if config, err = rest.InClusterConfig(); err != nil {
		//使用kubeconfig文件创建集群配置
		if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
			panic(err.Error())
		}
	}

	//创建clientset
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	//使用clientset获取deployments
	deployments, err := clientset.AppsV1().Deployments("kube-system").List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err.Error())
	}

	for idx, item := range deployments.Items {
		fmt.Printf("%d->%s\n", idx+1, item.Name)
	}

	pods, err:= clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err.Error())
	}
	for idx, item := range pods.Items {
		fmt.Printf("%d->%s\n", idx+1, item.Name)
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

上面的代码运行可以获得kube-system命名空间之下的Pods:
这是一个非常典型的访问kubernetes集群资源的方式,通过client-go提供的ClientSet对象来获取资源数据,主要有以下三个步骤:

  • 1.使用kubeconfig文件或者ServiceAccount(InCluster模式)来创建访问Kubernetes API的Restful配置参数,也就是代码中的rest.Config对象
  • 2.使用reset.Config参数创建ClientSet对象,直接调用Kubernetes.NewForConfig(config)初始化
    1. 然后是Clientset对象的方法去获取各个Group下面对应资源对象进行CRUD操作

3.ClientSet对象

ClientSet实际上是对各种资源类的Clientset的一次封装,下面对Clientset对象的实现进行分析:

// staging/src/k8s.io/client-go/kubernetes/clientset.go
//NewForConfig使用给定的config创建一个新的clientset,clientset包含所有组的clientS,clientset中每个group有具体的版本
func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c
	if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
		if configShallowCopy.Burst <= 0 {
			return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
		}
		configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
	}
	var cs Clientset
	var err error
	// 将其他Group和版本的资源RESTClient封装到全局的Clientset对象中
	cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfig(&configShallowCopy)
	if err != nil {
		return nil, err
	}
	cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy)
	if err != nil {
		return nil, err
	}
	cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)
	if err != nil {
		return nil, err
	}
	// 使用configShallowCopy初始化各个版本的对象
	...........
	...........
	...........

	cs.appsV1beta1, err = appsv1beta1.NewForConfig(&configShallowCopy)
	if err != nil {
		return nil, err
 
	cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
	if err != nil {
		return nil, err
	}
	return &cs, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

上面的NewForConfig函数就是将其他的各种资源的RESTClient封装到了全局的clientset中,当我们需要访问某个资源的时候只需要使用clientset里面包装的属性即可,比如clientset.CoreV1()就是访问Core下面v1这个版本的RestClient。这些局部的RESTClient都定义在staging/src/k8s.io/client-go/kubernetes/typed/<group>/<version>/<plug>_client.go文件中,比如staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go,这里以Deployment为例:

//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go
// NewForConfig根据rest.Config创建一个AppV1Client
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
	config := *c
	// 为rest_config设置资源对象默认的参数
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	// 实例化AppV1Client的RestClient
	client, err := rest.RESTClientFor(&config)
	if err != nil {
		return nil, err
	}
	return &AppsV1Client{client}, nil
}

func setConfigDefaults(config *rest.Config) error {
  // 设置资源对象的GroupVersion
	gv := v1.SchemeGroupVersion
	config.GroupVersion = &gv
 // 资源对象的root path
	config.APIPath = "/apis"
 // 使用注册的资源类型Scheme对请求和响应进行编解码
	config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

	if config.UserAgent == "" {
		config.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	return nil
}

// 根据config创建初始化并返回一个 RESTClient,其满足客户端Config对象上的属性的RESTClient对象。注意
// 在初始化客户端时,RESTClient可能需要一些可选的属性。
func RESTClientFor(config *Config) (*RESTClient, error) {
	if config.GroupVersion == nil {
		return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
	}
	if config.NegotiatedSerializer == nil {
		return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
	}

	baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
	if err != nil {
		return nil, err
	}

	transport, err := TransportFor(config)
	if err != nil {
		return nil, err
	}

	var httpClient *http.Client
	if transport != http.DefaultTransport {
		httpClient = &http.Client{Transport: transport}
		if config.Timeout > 0 {
			httpClient.Timeout = config.Timeout
		}
	}

	rateLimiter := config.RateLimiter
	if rateLimiter == nil {
		qps := config.QPS
		if config.QPS == 0.0 {
			qps = DefaultQPS
		}
		burst := config.Burst
		if config.Burst == 0 {
			burst = DefaultBurst
		}
		if qps > 0 {
			rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)
		}
	}

	var gv schema.GroupVersion
	if config.GroupVersion != nil {
		gv = *config.GroupVersion
	}
	clientContent := ClientContentConfig{
		AcceptContentTypes: config.AcceptContentTypes,
		ContentType:        config.ContentType,
		GroupVersion:       gv,
		Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),
	}

	return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)
}

// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go
//deployments实现了DeploymentInterface接口
type deployments struct {
	client rest.Interface
	ns     string
}
// newDeployments返回一个Deployments
func newDeployments(c *AppsV1Client, namespace string) *deployments {
	return &deployments{
		client: c.RESTClient(),
		ns:     namespace,
	}
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

根据上面代码可以clientset.AppV1.Deployment(“default”)来获取deployments对象,然后该对象下面定义了deployments对象的CRUD操作,比如List()函数
下面以Deployment为例说明相关接口实现关系
在这里插入图片描述

在这里插入图片描述

//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go
// 根据标签和selector域列出并返回匹配的deployments列表
func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) {
	var timeout time.Duration
	if opts.TimeoutSeconds != nil {
		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
	}
	result = &v1.DeploymentList{}
	err = c.client.Get().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

根据上面可以看出最终是通过c.client去发起请求,也就是局部restClient初始化函数中通过rest.RESTClient(&config)创建的对象,也就是将rest.Config对象转换成一个Restful的Client对象用于网络操作。

ClientSet是基于RESTClient的,RESTClient是底层的用于网络请求的对象,可以直接通过RESTClient提供的RESTful方法入Get()、Put()、等和APIServer进行交互。

  • 同时支持JSON和protobuf两种序列化方式
  • 支持所有原生化的资源
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/article/detail/40850
推荐阅读
相关标签
  

闽ICP备14008679号