文章
问答
冒泡
手撕Client-go:CRD Client的设计与实现

需求背景分析

基于 k8s 的二次开发过程中,有些场景我们会定制化的去开发自己的 CRD + Controller,即 Operator来实现基于k8s的云原生化的部署与自动化运维功能,暂且称之为底层的基座能力


如果我们想基于底层能力,并想要将其封装为控制台来供上层业务调用的话,我们需要有机会能够去控制与使用这样的接口能力,基于对Client-go的使用,也许有胖友会想到dynamic client的使用,但是作为设计与开发人员,我们应该清醒的认识到对于序列化与反序列化过程,扔一堆map是多么的头疼(除非恰巧业务开发人员与Operator设计设计者是同一人>..<)。


我们能不能有机会像使用k8s 中的原生资源如DeploymentsService等一样方便的去使用呢?

必备概念与技能

在进行具体分析前,建议胖友先去了解下Kubernetes API 概念,同时具备查阅Kubernetes API的能力。


为了方便举例,我在本地k8s集群注册了emqx-operator 中的 CRD自定义资源,将其视为与 k8s 原生资源同等地位。

emqxbrokers.apps.emqx.io                    2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io                2021-12-09T03:59:28Z

另外可以通过kubectl api-version查看其API相关信息:

$kubectl api-versions | grep emqx
apps.emqx.io/v1beta1

通过 kubectl api-resoures 查看相关 Group,Version,Kind信息。

$kubectl api-resources              
NAME                              SHORTNAMES   APIVERSION                             NAMESPACED   KIND
emqxbrokers                       emqx         apps.emqx.io/v1beta1                   true         EmqxBroker
emqxenterprises                   emqx-ee      apps.emqx.io/v1beta1                   true         EmqxEnterprise

相信胖友应该都掌握如上知识概念及具备如上的基础技能了。^.^

设计实现

关于 client-go 的官方文档描述还是蛮少的,作为设计与开发者,胖友们得具备源码分析能力。下面让我们切入client-go 官方 Repo 中。其中examplescreate-update-delete-deployment的示例展示了如何使用client-go库来进行rest请求的方法。

func main() {
	var kubeconfig *string
	if home := homedir.HomeDir(); home != "" {
		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
	} else {
		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
	}
	flag.Parse()

    // Config 的初始化
	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
	...
    // clientset 的构造
	clientset, err := kubernetes.NewForConfig(config)

    // 使用 resource client 进行 resource 资源的操作
	deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)

	// Create Deployment
	fmt.Println("Creating deployment...")
	result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())

	// Update Deployment
	prompt()
	fmt.Println("Updating deployment...")
	//    You have two options to Update() this Deployment:
	//
	//    1. Modify the "deployment" variable and call: Update(deployment).
	//       This works like the "kubectl replace" command and it overwrites/loses changes
	//       made by other clients between you Create() and Update() the object.
	//    2. Modify the "result" returned by Get() and retry Update(result) until
	//       you no longer get a conflict error. This way, you can preserve changes made
	//       by other clients between Create() and Update(). This is implemented below
	//			 using the retry utility package included with client-go. (RECOMMENDED)
	//
	// More Info:
	// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency

	retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
		// Retrieve the latest version of Deployment before attempting update
		// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
		result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
		if getErr != nil {
			panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
		}

		result.Spec.Replicas = int32Ptr(1)                           // reduce replica count
		result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
		_, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
		return updateErr
	})
	if retryErr != nil {
		panic(fmt.Errorf("Update failed: %v", retryErr))
	}
	fmt.Println("Updated deployment...")

	// List Deployments
    ...
    fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
	list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err)
	}
	for _, d := range list.Items {
		fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
	}

	// Delete Deployment
    ...
	fmt.Println("Deleting deployment...")
	deletePolicy := metav1.DeletePropagationForeground
	if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
		PropagationPolicy: &deletePolicy,
	}); err != nil {
		panic(err)
	}
	fmt.Println("Deleted deployment.")
}

设计的重点就在于如何设计与实现CRD对应的 resourceclientset

clientset 的设计

通过源码我们发现实际上就是通过 Config 去构造 rest http 的客户端。

NewForConfig

func NewForConfig(c *rest.Config) (*Clientset, error) {
	configShallowCopy := *c

	// share the transport between all clients
	httpClient, err := rest.HTTPClientFor(&configShallowCopy)
	if err != nil {
		return nil, err
	}

	return NewForConfigAndClient(&configShallowCopy, httpClient)
}

Clientset

type Interface interface {
	Discovery() discovery.DiscoveryInterface
    ...
    AppsV1() appsv1.AppsV1Interface // Deployment 资源调用接口
    ...
}

type Clientset struct {
    *discovery.DiscoveryClient
    ...
    appsV1                       *appsv1.AppsV1Client
    ...
}

到了这里我们发现 Clientset 就是对 AppSV1 下具体资源比如Deployment的抽象,对外暴露引用,我们最终需要的也是提供这样的一个抽象层面。


实际上AppsV1 也是一个维度的抽象,让我们继续往下看:

type AppsV1Interface interface {
	RESTClient() rest.Interface
    ...
	DeploymentsGetter
    ...
}

// AppsV1Client is used to interact with features provided by the apps group.
type AppsV1Client struct {
	restClient rest.Interface
}

...
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
	return newDeployments(c, namespace)
}
...
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
	config := *c
    // 需要重点留意 setConfigDefaults(&config)
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	httpClient, err := rest.HTTPClientFor(&config)
	if err != nil {
		return nil, err
	}
	return NewForConfigAndClient(&config, httpClient)
}

func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AppsV1Client, error) {
    ...
	return &AppsV1Client{client}, nil
}

...
func setConfigDefaults(config *rest.Config) error {
    // 需要重点留意 gv
	gv := v1.SchemeGroupVersion
	config.GroupVersion = &gv
	config.APIPath = "/apis"
	config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

	if config.UserAgent == "" {
		config.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	return nil
}

func (c *AppsV1Client) RESTClient() rest.Interface {
	if c == nil {
		return nil
	}
	return c.restClient
}

相信看到这里我们已经有了清晰的认识了,实际上这里就是去构造对应具体Resourcerest client


Resourceclient 大体构造我们是有了,那么客户端如何知道去请求啥URL的了?


这里考察我们对概念的理解与掌握,我们应该对SchemeGroupVersion这样的关键变量有敏锐的捕捉能力,我们去看看这个SchemeGroupVersion究竟是啥?

// GroupName is the group name use in this package
const GroupName = "apps"

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
	return SchemeGroupVersion.WithResource(resource).GroupResource()
}

var (
	// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
	// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
	SchemeBuilder      = runtime.NewSchemeBuilder(addKnownTypes)
	localSchemeBuilder = &SchemeBuilder
	AddToScheme        = localSchemeBuilder.AddToScheme
)

// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
	scheme.AddKnownTypes(SchemeGroupVersion,
		&Deployment{},
		&DeploymentList{},
		&StatefulSet{},
		&StatefulSetList{},
		&DaemonSet{},
		&DaemonSetList{},
		&ReplicaSet{},
		&ReplicaSetList{},
		&ControllerRevision{},
		&ControllerRevisionList{},
	)
	metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
	return nil
}

相信看到这里一切都柳暗花明了,其实就是告知k8s 我注册了这些schemak8s 知道了这些Resource的存在,那么当我去请求的Resouce操作的时候能够按照我们的预期达到功能实现。

CRD 关键实现

那么对于自定义的 CRD 实现肯定少不了如上分心的setConfigDefaults实现,如下为针对与emqx-operator实现:

var (
	emqxbrokerGVR = schema.GroupVersion{Group: "apps.emqx.io", Version: "v1beta1"}
)
func setConfigDefaults(config *rest.Config) error {
	gv := emqxbrokerGVR
	config.GroupVersion = &gv
	config.APIPath = "/apis"
	config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()

	if config.UserAgent == "" {
		config.UserAgent = rest.DefaultKubernetesUserAgent()
	}

	return nil
}

到了这里对于Client的构造就算结束了,这其中的彩蛋还包括请求APIURL

CRD 资源操作具体实现

如上我们主要讲述了如何去构造client,那么对于CRD的具体操作我们是如何实现的呢?


这里让我们将注意力再返回到AppsV1Interface

type AppsV1Interface interface {
	RESTClient() rest.Interface
    ...
	DeploymentsGetter
    ...
}

让我们看看DeploymentGetter究竟是啥。

...
type DeploymentsGetter interface {
	Deployments(namespace string) DeploymentInterface
}

// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
	Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
	Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
	UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
	Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
	DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
	Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
	List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
	Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
	ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
	GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
	UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
	ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)

	DeploymentExpansion
}
...

看到这里实际上一切都已经明了,通过Interface抽象了对于Deployment的操作,具体的实现就不展开分析了。

验证

环境准备

运行环境为通过minikube启动的本地k8s集群,另外在集群中注册CRD:

$kubectl get crd | grep emqx 
emqxbrokers.apps.emqx.io                    2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io                2021-12-09T03:59:28Z

验证CRD Client

下面让我们验证下Client实际运行情况,验证对自定义的CRD实例的Create,Get,List,Delete的验证。

// Demo 演示

func DemoForEmqxBroker(config *rest.Config, ns string) {
	// Create emqxbroker restclient
	clientset, err := pkg.NewForConfig(config)
	if err != nil {
		panic(err)
	}

	emqxbrokerClient := clientset.EmqxBrokersV1Beta1().EmqxBrokers(ns)

	// Create emqxbroker instance
	Prompt()
	fmt.Println("[> create emqxbroker")
	emqxbroker, err := emqxbrokerClient.Create(context.TODO(), resource.GenerateEmqxbroker(ns), metav1.CreateOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("create emqxbroker: %+v\n", emqxbroker)

	// Get emqxbroker instance
	Prompt()
    fmt.Println("[> get emqxbroker")
	eb, err := emqxbrokerClient.Get(context.TODO(), "emqx", metav1.GetOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("emqxbroker found: %+v\n", eb)

	// Get emqxbroker list
	Prompt()
    fmt.Println("[> list emqxbroker")
	eblist, err := emqxbrokerClient.List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("emqxbroker list: %+v\n", eblist)

	// Delete emqxbroker instance
	Prompt()
    fmt.Println("[> delete emqxbroker")
	err = emqxbrokerClient.Delete(context.TODO(), "emqx", metav1.DeleteOptions{})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Delete emqxbroker successfully")
}
  • Create

[> create emqxbroker
create emqxbroker: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]

查看下K8s集群中实例的情况:

$kubectl get emqx emqx         
NAME   AGE
emqx   2m26s
$kubectl get pods      
NAME     READY   STATUS    RESTARTS   AGE
emqx-0   1/1     Running   0          2m30s
emqx-1   1/1     Running   0          2m30s
emqx-2   1/1     Running   0          2m30s
  • Get

[> get emqxbroker
emqxbroker found: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
  • List

[> list emqxbroker
emqxbroker list: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink: ResourceVersion:157139 Continue: RemainingItemCount:<nil>} Items:[{TypeMeta:{Kind:EmqxBroker APIVersion:apps.emqx.io/v1beta1} ObjectMeta:{Name:emqx GenerateName: Namespace:default SelfLink: UID:74896493-0134-460a-a04d-d3bdaed21902 ResourceVersion:157121 Generation:1 CreationTimestamp:2021-12-26 20:23:15 +0800 CST DeletionTimestamp:<nil> DeletionGracePeriodSeconds:<nil> Labels:map[] Annotations:map[] OwnerReferences:[] Finalizers:[] ClusterName: ManagedFields:[{Manager:Go-http-client Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:spec":{".":{},"f:image":{},"f:labels":{".":{},"f:cluster":{}},"f:listener":{".":{},"f:nodePorts":{},"f:ports":{}},"f:replicas":{},"f:resources":{},"f:serviceAccountName":{}}} Subresource:} {Manager:__debug_bin4215542756 Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:status":{".":{},"f:conditions":{}}} Subresource:}]} Spec:{Replicas:0xc000492ec8 Image:emqx/emqx:4.3.10 ServiceAccountName:emqx Resources:{Limits:map[] Requests:map[]} Storage:<nil> Labels:map[cluster:emqx] Listener:{Type: LoadBalancerIP: LoadBalancerSourceRanges:[] ExternalIPs:[] Ports:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0} NodePorts:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0}} Affinity:nil ToleRations:[] NodeSelector:map[] ImagePullPolicy: ExtraVolumes:[] ExtraVolumeMounts:[] Env:[] ACL:[] Plugins:[] Modules:[]} Status:{Conditions:[{Type:Healthy Status:True LastUpdateTime:2021-12-26T20:27:26+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:16+08:00 Reason:Cluster available Message:Cluster ok} {Type:Creating Status:True LastUpdateTime:2021-12-26T20:23:15+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:15+08:00 Reason:Creating Message:Bootstrap emqx cluster}]}}]}
  • Delete

[> delete emqxbroker
Delete emqxbroker successfull
kubernetes
client-go
CRD

关于作者

Kirago
获得点赞
文章被阅读