文章
问答
冒泡
kubernetes源码分析系列之Statefulset Controller

简单写下Sts Controller处理流程与细节Sts Controller 源码分析

对于分析 Controller 源码选用 StatefulSet Controller 来,其它控制器源码分析一个套路,可以做参考。

StatefulSet 简介

此篇文章默认你已经具备了熟练使用 Statefulset 的基础知识,所以常规介绍及使用 Demo 的描述不在阐述,具体可参考 StatefulSet 基础

StatefulSet Controller 启动分析

kube-manager-controller 入口调用链分析

对于 Kubernetes 的源码组织结构不做过多介绍,希望你有一定的了解。

对于 k8s 是如何启动 kube-controller-manager,可以通过文档kube-controller-manager 查找到对应如下内容:

--controllers strings     默认值:[*]
要启用的控制器列表。\* 表示启用所有默认启用的控制器; foo 启用名为 foo 的控制器; -foo 表示禁用名为 foo 的控制器。
控制器的全集:attachdetach、bootstrapsigner、cloud-node-lifecycle、clusterrole-aggregation、cronjob、csrapproving、csrcleaner、csrsigning、daemonset、deployment、disruption、endpoint、endpointslice、endpointslicemirroring、ephemeral-volume、garbagecollector、horizontalpodautoscaling、job、namespace、nodeipam、nodelifecycle、persistentvolume-binder、persistentvolume-expander、podgc、pv-protection、pvc-protection、replicaset、replicationcontroller、resourcequota、root-ca-cert-publisher、route、service、serviceaccount、serviceaccount-token、statefulset、tokencleaner、ttl、ttl-after-finished
默认禁用的控制器有:bootstrapsigner 和 tokencleaner。

这里我们发现默认值启动中已经加入了 statefulset 的初始化,那么在代码是在哪里体现的呢?继续往下看。

进入 cmd/controller-managermain 函数,实际上就做两个事情:

func main() {
	command := app.NewControllerManagerCommand()    // 初始化
	code := cli.Run(command)    //真正执行
	os.Exit(code)
}

通过函数调用关系,我们进入 cmd/kube-controller-manager/controllermanager.go 中,查看 Run 执行究竟做了啥。

...
Run: func(cmd *cobra.Command, args []string) {
			verflag.PrintAndExitIfRequested()
			cliflag.PrintFlags(cmd.Flags())

			err := checkNonZeroInsecurePort(cmd.Flags())
			if err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

			// 这里是我们需要关注的点,就是 `KnownControllers() 实际上就是将我们进行需要我们初始化已知的 `Controllers`
			c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())     
			if err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

			if err := Run(c.Complete(), wait.NeverStop); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
...

深入 KnowControllers() 函数分析:

// KnownControllers returns all known controllers's name
// 这里我们发现实际上这边就是之前 `daemon` 进程启动需要的参数,为一个 `controller` 控制器数组。
func KnownControllers() []string {
	ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))

	// add "special" controllers that aren't initialized normally.  These controllers cannot be initialized
	// using a normal function.  The only known special case is the SA token controller which *must* be started
	// first to ensure that the SA tokens for future controllers will exist.  Think very carefully before adding
	// to this list.
	ret.Insert(
		saTokenControllerName,
	)

	return ret.List()
}

通过 NewControllerInitializers 可以知道的是真正执行 controller-manager 初始化的执行函数是这个。

// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc.  This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	
	// 此处即为在 `map` 中进行实质性的初始化赋值
	controllers["statefulset"] = startStatefulSetController  
	
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
		// TODO: volume controller into the IncludeCloudLoops only set.
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher
	controllers["ephemeral-volume"] = startEphemeralVolumeController
	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
		utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
		controllers["storage-version-gc"] = startStorageVersionGCController
	}

	return controllers
}

通过如上可以得知 StatefulSet 是如何被初始化到 kube-controller-manager 中的。

Statefulset Controller 启动过程

通过 cmd/kube-manager-controller/app/controllermanager.goRun 函数分析,其中

...
Run: func(cmd *cobra.Command, args []string) {
			verflag.PrintAndExitIfRequested()
			cliflag.PrintFlags(cmd.Flags())

			err := checkNonZeroInsecurePort(cmd.Flags())
			if err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

			c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List()) 
			if err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}

			// 此处是对 `Controller manager` 进行 `config` 的初始化,然后对其中所管理的 `controller` 进行 `Run` 执行
			if err := Run(c.Complete(), wait.NeverStop); err != nil { 
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
...

进入 Run 函数进行分析:

// Run runs the KubeControllerManagerOptions.  This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())
...
	clientBuilder, rootClientBuilder := createClientBuilders(c)

	saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController

	run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { // 此处是我们需要关注的 `Controller` 初始化。

		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
		if err != nil {
			klog.Fatalf("error building controller context: %v", err)
		}
		controllerInitializers := initializersFunc(controllerContext.LoopMode)
        // 在这里是真正意义上开始对管理的控制器执行启动
		if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}

		controllerContext.InformerFactory.Start(stopCh)
		controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
		close(controllerContext.InformersStarted)

		select {}
	}
...
	select {}
}

查看 StartControllers 函数:

// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
	unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
    ...
    // 这里会 `for` 循环遍历初始化过的 `controllers` 进行处理,需要关注下 `initFn` 究竟做了啥.
	for controllerName, initFn := range controllers {
		if !controllerCtx.IsControllerEnabled(controllerName) {
			klog.Warningf("%q is disabled", controllerName)
			continue
		}

		time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))

		klog.V(1).Infof("Starting %q", controllerName)
		ctrl, started, err := initFn(ctx, controllerCtx)
		if err != nil {
			klog.Errorf("Error starting %q", controllerName)
			return err
		}
		if !started {
			klog.Warningf("Skipping %q", controllerName)
			continue
		}
    ...
		klog.Infof("Started %q", controllerName)
	}

	healthzHandler.AddHealthChecker(controllerChecks...)

	return nil
}
// 用于判断具体的 `controller` 是否满足接口需求来得到 `controller manager` 支持的特性
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)

看到这里我们似乎还是没看到 StatefulSet Controller 真正执行的地方,请再次回顾下我们之前 NewControllerInitializers 中的内容:

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    ...
    controllers["statefulset"] = startStatefulSetController
    ...
}

感觉看到了一丝丝曙光,继续往下看 startStatefulSetController 的具体实现。

func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
	go statefulset.NewStatefulSetController(
		// 如下是 `Sts` 直接相关类型
		controllerContext.InformerFactory.Core().V1().Pods(),
		controllerContext.InformerFactory.Apps().V1().StatefulSets(),
		controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
		controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
		controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
	).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
	return nil, true, nil
}

看到这里我们终于知道了 StatefulSet Controller 真正意义上是被如何启动的了。

StatefulSet Controller 明细分析

通过如上的分析,下面就到了 StatefulSet Controller 具体的范畴了。

StatefulSetController

type StatefulSetController struct {
	// client interface
	kubeClient clientset.Interface
	// control returns an interface capable of syncing a stateful set.
	// Abstracted out for testing.
	control StatefulSetControlInterface
	// podControl is used for patching pods.
	podControl controller.PodControlInterface
	// podLister is able to list/get pods from a shared informer's store
	podLister corelisters.PodLister
	// podListerSynced returns true if the pod shared informer has synced at least once
	podListerSynced cache.InformerSynced
	// setLister is able to list/get stateful sets from a shared informer's store
	setLister appslisters.StatefulSetLister
	// setListerSynced returns true if the stateful set shared informer has synced at least once
	setListerSynced cache.InformerSynced
	// pvcListerSynced returns true if the pvc shared informer has synced at least once
	pvcListerSynced cache.InformerSynced
	// revListerSynced returns true if the rev shared informer has synced at least once
	revListerSynced cache.InformerSynced
	// StatefulSets that need to be synced.
	queue workqueue.RateLimitingInterface
}

通过对 StatefulSetController 结构体的大纲,了解下大概的结构:
ssc

NewStatefulSetController

对于 ssc 的构造函数分析:

func NewStatefulSetController(
	// 可以观察到,这边都是 `ssc` 关心的 `resource` 对象,Pod/Sts/Pvc/Revision
	podInformer coreinformers.PodInformer,
	setInformer appsinformers.StatefulSetInformer,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	revInformer appsinformers.ControllerRevisionInformer,
	kubeClient clientset.Interface,
) *StatefulSetController {
	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartStructuredLogging(0)
	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
	ssc := &StatefulSetController{
		kubeClient: kubeClient,
		control: NewDefaultStatefulSetControl(
			NewRealStatefulPodControl(
				kubeClient,
				setInformer.Lister(),
				podInformer.Lister(),
				pvcInformer.Lister(),
				recorder),
			NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
			history.NewHistory(kubeClient, revInformer.Lister()),
			recorder,
		),
		pvcListerSynced: pvcInformer.Informer().HasSynced,
		queue:           workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
		podControl:      controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},

		revListerSynced: revInformer.Informer().HasSynced,
	}

	// `Sts` 管理的 `Pod crud` 时对应的处理方法
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		// lookup the statefulset and enqueue
		AddFunc: ssc.addPod,
		// lookup current and old statefulset if labels changed
		UpdateFunc: ssc.updatePod,
		// lookup statefulset accounting for deletion tombstones
		DeleteFunc: ssc.deletePod,
	})
	ssc.podLister = podInformer.Lister()
	ssc.podListerSynced = podInformer.Informer().HasSynced

	// `Sts crud` 时对应的方法 
	setInformer.Informer().AddEventHandler(
		cache.ResourceEventHandlerFuncs{
			AddFunc: ssc.enqueueStatefulSet,
			UpdateFunc: func(old, cur interface{}) {
				oldPS := old.(*apps.StatefulSet)
				curPS := cur.(*apps.StatefulSet)
				if oldPS.Status.Replicas != curPS.Status.Replicas {
					klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
				}
				ssc.enqueueStatefulSet(cur)
			},
			DeleteFunc: ssc.enqueueStatefulSet,
		},
	)
	ssc.setLister = setInformer.Lister()
	ssc.setListerSynced = setInformer.Informer().HasSynced

	// TODO: Watch volumes
	return ssc
}

ControllerRevision

这里了解下 ControllerRevision 究竟是啥,为啥需要关注。

// ControllerRevision implements an immutable snapshot of state data. Clients
// are responsible for serializing and deserializing the objects that contain
// their internal state.
// Once a ControllerRevision has been successfully created, it can not be updated.
// The API Server will fail validation of all requests that attempt to mutate
// the Data field. ControllerRevisions may, however, be deleted. Note that, due to its use by both
// the DaemonSet and StatefulSet controllers for update and rollback, this object is beta. However,
// it may be subject to name and representation changes in future releases, and clients should not
// depend on its stability. It is primarily for internal use by controllers.
type ControllerRevision struct {
	metav1.TypeMeta `json:",inline"`
	// Standard object's metadata.
	// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
	// +optional
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

	// Data is the serialized representation of the state.
	Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`

	// Revision indicates the revision of the state represented by Data.
	Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}

由对应的注释可以知道:
ControllerRevision 提供给 DaemonSetStatefulSet 用作更新和回滚,ControllerRevision 存放的是数据的快照,ControllerRevision 生成之后内容是不可修改的,由调用端来负责序列化写入和反序列化读取。其中 Revision(int64) 字段相当于 ControllerRevision 的版本 id 号,Data字段则存放序列化后的数据。
所以 Sts 的更新以及回滚是基于新旧 ControllerRevision 的对比来进行的。

NewDefaultStatefulSetControl

深入看下 NewDefaultStatefulSetControl 定义:

// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
	// 管理 Sts 对应的 Pod 的接口
	podControl StatefulPodControlInterface,
	// 管理 Sts 的  Status 的更新接口
	statusUpdater StatefulSetStatusUpdaterInterface,
	// 管理 ControllerRevision 的接口
	controllerHistory history.Interface,
	// 事件记录器接口
	recorder record.EventRecorder) StatefulSetControlInterface {
	return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}

Run 函数执行过程

// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer ssc.queue.ShutDown()

	klog.Infof("Starting stateful set controller")
	defer klog.Infof("Shutting down statefulset controller")

	if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
		return
	}

	for i := 0; i < workers; i++ {
		go wait.Until(ssc.worker, time.Second, stopCh)
	}

	<-stopCh
}

此处关注下 wait.Until 工具:

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

通过注释可以知道, Until 工具会根据 channel 的关闭来周期性的执行函数 f
主要解决的是当我们执行完某些操作后,还需要等待其他资源执行的情况,例如对于有依赖条件的资源释放的时候,A 依赖于 B,那么对 A 资源释放的时候还需要对 B 资源的释放进行观望。这在 k8s 的资源操作场景是常见的。

继续关注 wait.Until 中包的函数 ssc.worker

// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
func (ssc *StatefulSetController) worker() {
	for ssc.processNextWorkItem() {
	}
}

worker 通过运行一个 goroutine 来处理 processNextWorkItem 直到 controller 相关的 queue 被关闭。

毫无疑问,需要分析 processNextWorkItem() 对应函数:

// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (ssc *StatefulSetController) processNextWorkItem() bool {
	key, quit := ssc.queue.Get()
	if quit {
		return false
	}
	defer ssc.queue.Done(key)
	// 其它语义很容易理解,需要关注的是 ssc.sync() 函数
	if err := ssc.sync(key.(string)); err != nil {
		utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
		ssc.queue.AddRateLimited(key)
	} else {
		ssc.queue.Forget(key)
	}
	return true
}

processNextWorkItem() 主要用于对 queue 的元素出队,并标记为已处理。

// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
	}()

	// 对缓存中的 key 进行 split操作
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	// 对缓存根据 namespace 及 name 进行 get 操作
	set, err := ssc.setLister.StatefulSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.Infof("StatefulSet has been deleted %v", key)
		return nil
	}
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
		return err
	}

	// 获取 sts 的 selector
	selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
		// This is a non-transient error, so don't retry.
		return nil
	}

	// 调用 ssc.adoptOrphanRevisions 检查是否有孤儿 controllerrevisions 对象,若有且能匹配 selector 的则添加 ownerReferences 进行关联,已关联但 label 不匹配的则进行释放。
	if err := ssc.adoptOrphanRevisions(set); err != nil {
		return err
	}

	// 调用 ssc.getPodsForStatefulSet 通过 selector 获取 sts 关联的 pod,若有孤儿 pod 的 label 与 sts 的能匹配则进行关联,若已关联的 pod label 有变化则解除与 sts 的关联关系。
	pods, err := ssc.getPodsForStatefulSet(set, selector)
	if err != nil {
		return err
	}

	// 执行真正的 sync 操作
	return ssc.syncStatefulSet(set, pods)
}

syncStatefulSet

// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
	klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
	var status *apps.StatefulSetStatus
	var err error
	// TODO: investigate where we mutate the set during the update as it is not obvious.
	// 中仅仅是调用了 ssc.control.UpdateStatefulSet 方法进行处理。
	status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
	if err != nil {
		return err
	}
	klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
	// One more sync to handle the clock skew. This is also helping in requeuing right after status update
	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
		ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
	}

	return nil
}

UpdateStatefulSet

  1. 获取历史 revisions

  2. 计算 currentRevisionupdateRevision,若 sts 处于更新过程中则 currentRevisionupdateRevision 值不同;

  3. 调用 ssc.performUpdate 执行实际的 sync 操作;

  4. 调用 ssc.updateStatefulSetStatus 更新 status subResource

  5. 根据 stsspec.revisionHistoryLimit 字段清理过期的 controllerrevision

func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
	// 获取 revisions 并排序
	revisions, err := ssc.ListRevisions(set)
	if err != nil {
		return nil, err
	}
	history.SortControllerRevisions(revisions)

	// 计算 Revison
	currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
	if err != nil {
		return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
	}

	// 清除过期的历史版本
	return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}

func (ssc *defaultStatefulSetControl) performUpdate(
	set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
	var currentStatus *apps.StatefulSetStatus
	// get the current, and update revisions
	currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
	if err != nil {
		return currentRevision, updateRevision, currentStatus, err
	}

	// 实现具体的 update 操作
	currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
	if err != nil {
		return currentRevision, updateRevision, currentStatus, err
	}
	// update  status
	err = ssc.updateStatefulSetStatus(set, currentStatus)
	if err != nil {
		return currentRevision, updateRevision, currentStatus, err
	}
	klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
		set.Namespace,
		set.Name,
		currentStatus.Replicas,
		currentStatus.ReadyReplicas,
		currentStatus.CurrentReplicas,
		currentStatus.UpdatedReplicas)

	klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
		set.Namespace,
		set.Name,
		currentStatus.CurrentRevision,
		currentStatus.UpdateRevision)

	return currentRevision, updateRevision, currentStatus, nil
}

updateStatefulSet

// 作为updateStatefulSet的核心方法,重试保障Statefulset到达期望状态,update策略主要分为三类:
// 1.RollingUpdateStatefulSetStrategyType
// 2.OnDeleteStatefulSetStrategyType
// 3.PartitionStatefulSetStrategyType
func (ssc *defaultStatefulSetControl) updateStatefulSet(
	set *apps.StatefulSet,
	currentRevision *apps.ControllerRevision,
	updateRevision *apps.ControllerRevision,
	collisionCount int32,
	pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
	...
	// 获取当前和更新的 Revision
	currentSet, err := ApplyRevision(set, currentRevision)
	...
	// 构建 sts 对象
	currentSet, err := ApplyRevision(set, currentRevision)
	...
	// 构建 sts 对象
	updateSet, err := ApplyRevision(set, updateRevision)
	...
	// status 赋值
	status := apps.StatefulSetStatus{}
	status.ObservedGeneration = set.Generation
	status.CurrentRevision = currentRevision.Name
	status.UpdateRevision = updateRevision.Name
	status.CollisionCount = new(int32)
	*status.CollisionCount = collisionCount

	// replicas 存放  Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
	replicas := make([]*v1.Pod, replicaCount)
	// condemned 存放  Pods such that set.Spec.Replicas <= getOrdinal(pod)
	condemned := make([]*v1.Pod, 0, len(pods))
	...
	// 对 pods 进行处理分别存放到 replicas  和 condemned 切片中
	for i := range pods {
		status.Replicas++

		// 统计 running 和 ready 的副本数
		if isRunningAndReady(pods[i]) {
			status.ReadyReplicas++
			// 对门控是否开启特性的判断
			if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
				if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
					status.AvailableReplicas++
				}
			} else {
				// 如果门控特性未开启,那么所有ready 的副本数将被认为是可用状态的副本数
				status.AvailableReplicas = status.ReadyReplicas
			}
		}

		// 统计 current 和 update 的副本数
		if isCreated(pods[i]) && !isTerminating(pods[i]) {
			if getPodRevision(pods[i]) == currentRevision.Name {
				status.CurrentReplicas++
			}
			if getPodRevision(pods[i]) == updateRevision.Name {
				status.UpdatedReplicas++
			}
		}

		if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
			// replicas 的赋值
			replicas[ord] = pods[i]

		} else if ord >= replicaCount {
			// condemned 的赋值
			condemned = append(condemned, pods[i])
		}
	}
	// 检查 replicas数组中 [0,set.Spec.Replicas) 下标是否有缺失的 pod,若有缺失的则创建对应的 pod object 
	// 在 newVersionedStatefulSetPod 中会判断是使用 currentSet 还是 updateSet 来创建
	for ord := 0; ord < replicaCount; ord++ {
		if replicas[ord] == nil {
			replicas[ord] = newVersionedStatefulSetPod(
				currentSet,
				updateSet,
				currentRevision.Name,
				updateRevision.Name, ord)
		}
	}

	// 对 condemned 数组进行排序 
	sort.Sort(ascendingOrdinal(condemned))


	// 根据 ord 在 replicas 和 condemned 数组中找出 first unhealthy Pod
	for i := range replicas {
		if !isHealthy(replicas[i]) {
			unhealthy++
			if firstUnhealthyPod == nil {
				firstUnhealthyPod = replicas[i]
			}
		}
	}

	for i := range condemned {
		if !isHealthy(condemned[i]) {
			unhealthy++
			if firstUnhealthyPod == nil {
				firstUnhealthyPod = condemned[i]
			}
		}
	}

	if unhealthy > 0 {
		klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
			set.Namespace,
			set.Name,
			unhealthy,
			firstUnhealthyPod.Name)
	}


	// 判断 set 是否处于 deleting 
	if set.DeletionTimestamp != nil {
		return &status, nil
	}

	// 默认设置为非并行模式
	monotonic := !allowsBurst(set)

	// 确保 replicas 数组中的所有 pod 都是 running 状态
	for i := range replicas {
		// 删除和重建失败的 pods
		if isFailed(replicas[i]) {
			ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
				"StatefulSet %s/%s is recreating failed Pod %s",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas--
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas--
			}
			status.Replicas--
			replicas[i] = newVersionedStatefulSetPod(
				currentSet,
				updateSet,
				currentRevision.Name,
				updateRevision.Name,
				i)
		}

		// 如果 pod 未被创建则进行创建
		if !isCreated(replicas[i]) {
			if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
				return &status, err
			}
			status.Replicas++
			if getPodRevision(replicas[i]) == currentRevision.Name {
				status.CurrentReplicas++
			}
			if getPodRevision(replicas[i]) == updateRevision.Name {
				status.UpdatedReplicas++
			}

			// if the set does not allow bursting, return immediately
			if monotonic {
				return &status, nil
			}
			// pod created, no more work possible for this round
			continue
		}

		// 当 pod 处于 terminating 状态的时候且不允许并行的情况下 则进行等待删除完成
		if isTerminating(replicas[i]) && monotonic {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to Terminate",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			return &status, nil
		}

		// 当 pod 已经被创建且不运行并行的情况下,状态并不是 running 和 ready 状态的处理。
		if !isRunningAndReady(replicas[i]) && monotonic {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			return &status, nil
		}

		// pod creates 成功但是并不是可用状态时的处理。
		if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
			klog.V(4).Infof(
				"StatefulSet %s/%s is waiting for Pod %s to be Available",
				set.Namespace,
				set.Name,
				replicas[i].Name)
			return &status, nil
		}

		// 对 sts 的唯一性及相关存储唯一性的检查
		if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
			continue
		}
		...
	}
}

至此结束。


关于作者

Kirago
个人站点 https://kiragoo.github.io/
获得点赞
文章被阅读