需求背景分析
基于 k8s
的二次开发过程中,有些场景我们会定制化的去开发自己的 CRD
+ Controller
,即 Operator
来实现基于k8s
的云原生化的部署与自动化运维功能,暂且称之为底层的基座能力。
如果我们想基于底层能力,并想要将其封装为控制台来供上层业务调用的话,我们需要有机会能够去控制与使用这样的接口能力,基于对Client-go
的使用,也许有胖友会想到dynamic client
的使用,但是作为设计与开发人员,我们应该清醒的认识到对于序列化与反序列化过程,扔一堆map
是多么的头疼(除非恰巧业务开发人员与Operator
设计设计者是同一人>..<)。
我们能不能有机会像使用k8s
中的原生资源如Deployments
、Service
等一样方便的去使用呢?
必备概念与技能
在进行具体分析前,建议胖友先去了解下Kubernetes API
概念,同时具备查阅Kubernetes API
的能力。
为了方便举例,我在本地k8s
集群注册了emqx-operator
中的 CRD
自定义资源,将其视为与 k8s
原生资源同等地位。
emqxbrokers.apps.emqx.io 2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z
另外可以通过kubectl api-version
查看其API
相关信息:
$kubectl api-versions | grep emqx
apps.emqx.io/v1beta1
通过 kubectl api-resoures
查看相关 Group
,Version
,Kind
信息。
$kubectl api-resources
NAME SHORTNAMES APIVERSION NAMESPACED KIND
emqxbrokers emqx apps.emqx.io/v1beta1 true EmqxBroker
emqxenterprises emqx-ee apps.emqx.io/v1beta1 true EmqxEnterprise
相信胖友应该都掌握如上知识概念及具备如上的基础技能了。^.^
设计实现
关于 client-go
的官方文档描述还是蛮少的,作为设计与开发者,胖友们得具备源码分析能力。下面让我们切入client-go
官方 Repo
中。其中examples
的create-update-delete-deployment
的示例展示了如何使用client-go
库来进行rest
请求的方法。
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// Config 的初始化
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
...
// clientset 的构造
clientset, err := kubernetes.NewForConfig(config)
// 使用 resource client 进行 resource 资源的操作
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
// Create Deployment
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
// Update Deployment
prompt()
fmt.Println("Updating deployment...")
// You have two options to Update() this Deployment:
//
// 1. Modify the "deployment" variable and call: Update(deployment).
// This works like the "kubectl replace" command and it overwrites/loses changes
// made by other clients between you Create() and Update() the object.
// 2. Modify the "result" returned by Get() and retry Update(result) until
// you no longer get a conflict error. This way, you can preserve changes made
// by other clients between Create() and Update(). This is implemented below
// using the retry utility package included with client-go. (RECOMMENDED)
//
// More Info:
// https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// Retrieve the latest version of Deployment before attempting update
// RetryOnConflict uses exponential backoff to avoid exhausting the apiserver
result, getErr := deploymentsClient.Get(context.TODO(), "demo-deployment", metav1.GetOptions{})
if getErr != nil {
panic(fmt.Errorf("Failed to get latest version of Deployment: %v", getErr))
}
result.Spec.Replicas = int32Ptr(1) // reduce replica count
result.Spec.Template.Spec.Containers[0].Image = "nginx:1.13" // change nginx version
_, updateErr := deploymentsClient.Update(context.TODO(), result, metav1.UpdateOptions{})
return updateErr
})
if retryErr != nil {
panic(fmt.Errorf("Update failed: %v", retryErr))
}
fmt.Println("Updated deployment...")
// List Deployments
...
fmt.Printf("Listing deployments in namespace %q:\n", apiv1.NamespaceDefault)
list, err := deploymentsClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, d := range list.Items {
fmt.Printf(" * %s (%d replicas)\n", d.Name, *d.Spec.Replicas)
}
// Delete Deployment
...
fmt.Println("Deleting deployment...")
deletePolicy := metav1.DeletePropagationForeground
if err := deploymentsClient.Delete(context.TODO(), "demo-deployment", metav1.DeleteOptions{
PropagationPolicy: &deletePolicy,
}); err != nil {
panic(err)
}
fmt.Println("Deleted deployment.")
}
设计的重点就在于如何设计与实现CRD
对应的 resource
的clientset
。
clientset
的设计
通过源码我们发现实际上就是通过 Config
去构造 rest http
的客户端。
NewForConfig
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
Clientset
type Interface interface {
Discovery() discovery.DiscoveryInterface
...
AppsV1() appsv1.AppsV1Interface // Deployment 资源调用接口
...
}
type Clientset struct {
*discovery.DiscoveryClient
...
appsV1 *appsv1.AppsV1Client
...
}
到了这里我们发现 Clientset
就是对 AppSV1
下具体资源比如Deployment
的抽象,对外暴露引用,我们最终需要的也是提供这样的一个抽象层面。
实际上AppsV1
也是一个维度的抽象,让我们继续往下看:
type AppsV1Interface interface {
RESTClient() rest.Interface
...
DeploymentsGetter
...
}
// AppsV1Client is used to interact with features provided by the apps group.
type AppsV1Client struct {
restClient rest.Interface
}
...
func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {
return newDeployments(c, namespace)
}
...
func NewForConfig(c *rest.Config) (*AppsV1Client, error) {
config := *c
// 需要重点留意 setConfigDefaults(&config)
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&config, httpClient)
}
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*AppsV1Client, error) {
...
return &AppsV1Client{client}, nil
}
...
func setConfigDefaults(config *rest.Config) error {
// 需要重点留意 gv
gv := v1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
func (c *AppsV1Client) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}
相信看到这里我们已经有了清晰的认识了,实际上这里就是去构造对应具体Resource
的rest client
。
Resource
的client
大体构造我们是有了,那么客户端如何知道去请求啥URL
的了?
这里考察我们对概念的理解与掌握,我们应该对SchemeGroupVersion
这样的关键变量有敏锐的捕捉能力,我们去看看这个SchemeGroupVersion
究竟是啥?
// GroupName is the group name use in this package
const GroupName = "apps"
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
localSchemeBuilder = &SchemeBuilder
AddToScheme = localSchemeBuilder.AddToScheme
)
// Adds the list of known types to the given scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Deployment{},
&DeploymentList{},
&StatefulSet{},
&StatefulSetList{},
&DaemonSet{},
&DaemonSetList{},
&ReplicaSet{},
&ReplicaSetList{},
&ControllerRevision{},
&ControllerRevisionList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
相信看到这里一切都柳暗花明了,其实就是告知k8s
我注册了这些schema
,k8s
知道了这些Resource
的存在,那么当我去请求的Resouce
操作的时候能够按照我们的预期达到功能实现。
CRD
关键实现
那么对于自定义的 CRD
实现肯定少不了如上分心的setConfigDefaults
实现,如下为针对与emqx-operator
实现:
var (
emqxbrokerGVR = schema.GroupVersion{Group: "apps.emqx.io", Version: "v1beta1"}
)
func setConfigDefaults(config *rest.Config) error {
gv := emqxbrokerGVR
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
到了这里对于Client
的构造就算结束了,这其中的彩蛋还包括请求API
的URL
。
CRD
资源操作具体实现
如上我们主要讲述了如何去构造client
,那么对于CRD
的具体操作我们是如何实现的呢?
这里让我们将注意力再返回到AppsV1Interface
:
type AppsV1Interface interface {
RESTClient() rest.Interface
...
DeploymentsGetter
...
}
让我们看看DeploymentGetter
究竟是啥。
...
type DeploymentsGetter interface {
Deployments(namespace string) DeploymentInterface
}
// DeploymentInterface has methods to work with Deployment resources.
type DeploymentInterface interface {
Create(ctx context.Context, deployment *v1.Deployment, opts metav1.CreateOptions) (*v1.Deployment, error)
Update(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
UpdateStatus(ctx context.Context, deployment *v1.Deployment, opts metav1.UpdateOptions) (*v1.Deployment, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Deployment, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.DeploymentList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Deployment, err error)
Apply(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
ApplyStatus(ctx context.Context, deployment *appsv1.DeploymentApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Deployment, err error)
GetScale(ctx context.Context, deploymentName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
UpdateScale(ctx context.Context, deploymentName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)
ApplyScale(ctx context.Context, deploymentName string, scale *applyconfigurationsautoscalingv1.ScaleApplyConfiguration, opts metav1.ApplyOptions) (*autoscalingv1.Scale, error)
DeploymentExpansion
}
...
看到这里实际上一切都已经明了,通过Interface
抽象了对于Deployment
的操作,具体的实现就不展开分析了。
验证
环境准备
运行环境为通过minikube
启动的本地k8s
集群,另外在集群中注册CRD
:
$kubectl get crd | grep emqx
emqxbrokers.apps.emqx.io 2021-12-09T03:59:28Z
emqxenterprises.apps.emqx.io 2021-12-09T03:59:28Z
验证CRD Client
下面让我们验证下Client
实际运行情况,验证对自定义的CRD
实例的Create
,Get
,List
,Delete
的验证。
// Demo 演示
func DemoForEmqxBroker(config *rest.Config, ns string) {
// Create emqxbroker restclient
clientset, err := pkg.NewForConfig(config)
if err != nil {
panic(err)
}
emqxbrokerClient := clientset.EmqxBrokersV1Beta1().EmqxBrokers(ns)
// Create emqxbroker instance
Prompt()
fmt.Println("[> create emqxbroker")
emqxbroker, err := emqxbrokerClient.Create(context.TODO(), resource.GenerateEmqxbroker(ns), metav1.CreateOptions{})
if err != nil {
panic(err)
}
fmt.Printf("create emqxbroker: %+v\n", emqxbroker)
// Get emqxbroker instance
Prompt()
fmt.Println("[> get emqxbroker")
eb, err := emqxbrokerClient.Get(context.TODO(), "emqx", metav1.GetOptions{})
if err != nil {
panic(err)
}
fmt.Printf("emqxbroker found: %+v\n", eb)
// Get emqxbroker list
Prompt()
fmt.Println("[> list emqxbroker")
eblist, err := emqxbrokerClient.List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
fmt.Printf("emqxbroker list: %+v\n", eblist)
// Delete emqxbroker instance
Prompt()
fmt.Println("[> delete emqxbroker")
err = emqxbrokerClient.Delete(context.TODO(), "emqx", metav1.DeleteOptions{})
if err != nil {
panic(err)
}
fmt.Printf("Delete emqxbroker successfully")
}
Create
[> create emqxbroker
create emqxbroker: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
查看下K8s
集群中实例的情况:
$kubectl get emqx emqx
NAME AGE
emqx 2m26s
$kubectl get pods
NAME READY STATUS RESTARTS AGE
emqx-0 1/1 Running 0 2m30s
emqx-1 1/1 Running 0 2m30s
emqx-2 1/1 Running 0 2m30s
Get
[> get emqxbroker
emqxbroker found: EmqxBroker instance [emqx],Image [emqx/emqx:4.3.10]
List
[> list emqxbroker
emqxbroker list: &{TypeMeta:{Kind: APIVersion:} ListMeta:{SelfLink: ResourceVersion:157139 Continue: RemainingItemCount:<nil>} Items:[{TypeMeta:{Kind:EmqxBroker APIVersion:apps.emqx.io/v1beta1} ObjectMeta:{Name:emqx GenerateName: Namespace:default SelfLink: UID:74896493-0134-460a-a04d-d3bdaed21902 ResourceVersion:157121 Generation:1 CreationTimestamp:2021-12-26 20:23:15 +0800 CST DeletionTimestamp:<nil> DeletionGracePeriodSeconds:<nil> Labels:map[] Annotations:map[] OwnerReferences:[] Finalizers:[] ClusterName: ManagedFields:[{Manager:Go-http-client Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:spec":{".":{},"f:image":{},"f:labels":{".":{},"f:cluster":{}},"f:listener":{".":{},"f:nodePorts":{},"f:ports":{}},"f:replicas":{},"f:resources":{},"f:serviceAccountName":{}}} Subresource:} {Manager:__debug_bin4215542756 Operation:Update APIVersion:apps.emqx.io/v1beta1 Time:2021-12-26 20:23:15 +0800 CST FieldsType:FieldsV1 FieldsV1:{"f:status":{".":{},"f:conditions":{}}} Subresource:}]} Spec:{Replicas:0xc000492ec8 Image:emqx/emqx:4.3.10 ServiceAccountName:emqx Resources:{Limits:map[] Requests:map[]} Storage:<nil> Labels:map[cluster:emqx] Listener:{Type: LoadBalancerIP: LoadBalancerSourceRanges:[] ExternalIPs:[] Ports:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0} NodePorts:{MQTT:0 MQTTS:0 WS:0 WSS:0 Dashboard:0 API:0}} Affinity:nil ToleRations:[] NodeSelector:map[] ImagePullPolicy: ExtraVolumes:[] ExtraVolumeMounts:[] Env:[] ACL:[] Plugins:[] Modules:[]} Status:{Conditions:[{Type:Healthy Status:True LastUpdateTime:2021-12-26T20:27:26+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:16+08:00 Reason:Cluster available Message:Cluster ok} {Type:Creating Status:True LastUpdateTime:2021-12-26T20:23:15+08:00 LastUpdateAt:0001-01-01 00:00:00 +0000 UTC LastTransitionTime:2021-12-26T20:23:15+08:00 Reason:Creating Message:Bootstrap emqx cluster}]}}]}
Delete
[> delete emqxbroker
Delete emqxbroker successfull