赞
踩
Clientset是调用kubernetes资源对象最常用的客户端,可以操作所有的资源对象。
前面我们说了在staging/src/k8s.io/api下面定义了各种资源类型的规范,然后将这些规范注册到了全局的Scheme中,这样就可以在Clientset中使用这些资源了。那么我们应该如何使用Clientset呢?
首先我们来看下如何通过clientset来获取资源对象,我们这里来创建一个Clientset对象,然后通过该对象来获取默认命名空间之下的Deployments列表,代码如下所示:
package main import ( "context" "flag" "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "path/filepath" ) // 获取defaults命名空间下所有deployment列表 func main() { var err error var config *rest.Config var kubeconfig *string ctx := context.Background() if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "absolute path to kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "","absolute path to kubeconfig file") } flag.Parse() // 使用sa创建集群配置(InCluster模式),需要去配置对应的RBAC权限,默认的是default->无权限获取deployments的List权限 if config, err = rest.InClusterConfig(); err != nil { //使用kubeconfig文件创建集群配置 if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } //创建clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } //使用clientset获取deployments deployments, err := clientset.AppsV1().Deployments("kube-system").List(ctx, metav1.ListOptions{}) if err != nil { panic(err.Error()) } for idx, item := range deployments.Items { fmt.Printf("%d->%s\n", idx+1, item.Name) } pods, err:= clientset.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{}) if err != nil { panic(err.Error()) } for idx, item := range pods.Items { fmt.Printf("%d->%s\n", idx+1, item.Name) } }
上面的代码运行可以获得kube-system命名空间之下的Pods:
这是一个非常典型的访问kubernetes集群资源的方式,通过client-go提供的ClientSet对象来获取资源数据,主要有以下三个步骤:
ClientSet实际上是对各种资源类的Clientset的一次封装,下面对Clientset对象的实现进行分析:
// staging/src/k8s.io/client-go/kubernetes/clientset.go //NewForConfig使用给定的config创建一个新的clientset,clientset包含所有组的clientS,clientset中每个group有具体的版本 func NewForConfig(c *rest.Config) (*Clientset, error) { configShallowCopy := *c if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 { if configShallowCopy.Burst <= 0 { return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0") } configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst) } var cs Clientset var err error // 将其他Group和版本的资源RESTClient封装到全局的Clientset对象中 cs.admissionregistrationV1, err = admissionregistrationv1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } cs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy) if err != nil { return nil, err } // 使用configShallowCopy初始化各个版本的对象 ........... ........... ........... cs.appsV1beta1, err = appsv1beta1.NewForConfig(&configShallowCopy) if err != nil { return nil, err cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { return nil, err } return &cs, nil }
上面的NewForConfig函数就是将其他的各种资源的RESTClient封装到了全局的clientset中,当我们需要访问某个资源的时候只需要使用clientset里面包装的属性即可,比如clientset.CoreV1()就是访问Core下面v1这个版本的RestClient。这些局部的RESTClient都定义在staging/src/k8s.io/client-go/kubernetes/typed/<group>/<version>/<plug>_client.go文件中,比如staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go,这里以Deployment为例:
//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go // NewForConfig根据rest.Config创建一个AppV1Client func NewForConfig(c *rest.Config) (*AppsV1Client, error) { config := *c // 为rest_config设置资源对象默认的参数 if err := setConfigDefaults(&config); err != nil { return nil, err } // 实例化AppV1Client的RestClient client, err := rest.RESTClientFor(&config) if err != nil { return nil, err } return &AppsV1Client{client}, nil } func setConfigDefaults(config *rest.Config) error { // 设置资源对象的GroupVersion gv := v1.SchemeGroupVersion config.GroupVersion = &gv // 资源对象的root path config.APIPath = "/apis" // 使用注册的资源类型Scheme对请求和响应进行编解码 config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() if config.UserAgent == "" { config.UserAgent = rest.DefaultKubernetesUserAgent() } return nil } // 根据config创建初始化并返回一个 RESTClient,其满足客户端Config对象上的属性的RESTClient对象。注意 // 在初始化客户端时,RESTClient可能需要一些可选的属性。 func RESTClientFor(config *Config) (*RESTClient, error) { if config.GroupVersion == nil { return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient") } if config.NegotiatedSerializer == nil { return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient") } baseURL, versionedAPIPath, err := defaultServerUrlFor(config) if err != nil { return nil, err } transport, err := TransportFor(config) if err != nil { return nil, err } var httpClient *http.Client if transport != http.DefaultTransport { httpClient = &http.Client{Transport: transport} if config.Timeout > 0 { httpClient.Timeout = config.Timeout } } rateLimiter := config.RateLimiter if rateLimiter == nil { qps := config.QPS if config.QPS == 0.0 { qps = DefaultQPS } burst := config.Burst if config.Burst == 0 { burst = DefaultBurst } if qps > 0 { rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst) } } var gv schema.GroupVersion if config.GroupVersion != nil { gv = *config.GroupVersion } clientContent := ClientContentConfig{ AcceptContentTypes: config.AcceptContentTypes, ContentType: config.ContentType, GroupVersion: gv, Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv), } return NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient) } // staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go //deployments实现了DeploymentInterface接口 type deployments struct { client rest.Interface ns string } // newDeployments返回一个Deployments func newDeployments(c *AppsV1Client, namespace string) *deployments { return &deployments{ client: c.RESTClient(), ns: namespace, } }
根据上面代码可以clientset.AppV1.Deployment(“default”)来获取deployments对象,然后该对象下面定义了deployments对象的CRUD操作,比如List()函数
下面以Deployment为例说明相关接口实现关系


//staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go // 根据标签和selector域列出并返回匹配的deployments列表 func (c *deployments) List(ctx context.Context, opts metav1.ListOptions) (result *v1.DeploymentList, err error) { var timeout time.Duration if opts.TimeoutSeconds != nil { timeout = time.Duration(*opts.TimeoutSeconds) * time.Second } result = &v1.DeploymentList{} err = c.client.Get(). Namespace(c.ns). Resource("deployments"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) return }
根据上面可以看出最终是通过c.client去发起请求,也就是局部restClient初始化函数中通过rest.RESTClient(&config)创建的对象,也就是将rest.Config对象转换成一个Restful的Client对象用于网络操作。
ClientSet是基于RESTClient的,RESTClient是底层的用于网络请求的对象,可以直接通过RESTClient提供的RESTful方法入Get()、Put()、等和APIServer进行交互。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。