简单写下Sts Controller处理流程与细节Sts Controller 源码分析
对于分析
Controller
源码选用StatefulSet Controller
来,其它控制器源码分析一个套路,可以做参考。
StatefulSet
简介
此篇文章默认你已经具备了熟练使用 Statefulset
的基础知识,所以常规介绍及使用 Demo
的描述不在阐述,具体可参考 StatefulSet 基础
StatefulSet Controller
启动分析
kube-manager-controller
入口调用链分析
对于 Kubernetes
的源码组织结构不做过多介绍,希望你有一定的了解。
对于 k8s
是如何启动 kube-controller-manager
,可以通过文档kube-controller-manager
查找到对应如下内容:
--controllers strings 默认值:[*]
要启用的控制器列表。\* 表示启用所有默认启用的控制器; foo 启用名为 foo 的控制器; -foo 表示禁用名为 foo 的控制器。
控制器的全集:attachdetach、bootstrapsigner、cloud-node-lifecycle、clusterrole-aggregation、cronjob、csrapproving、csrcleaner、csrsigning、daemonset、deployment、disruption、endpoint、endpointslice、endpointslicemirroring、ephemeral-volume、garbagecollector、horizontalpodautoscaling、job、namespace、nodeipam、nodelifecycle、persistentvolume-binder、persistentvolume-expander、podgc、pv-protection、pvc-protection、replicaset、replicationcontroller、resourcequota、root-ca-cert-publisher、route、service、serviceaccount、serviceaccount-token、statefulset、tokencleaner、ttl、ttl-after-finished
默认禁用的控制器有:bootstrapsigner 和 tokencleaner。
这里我们发现默认值启动中已经加入了 statefulset
的初始化,那么在代码是在哪里体现的呢?继续往下看。
进入 cmd/controller-manager
的 main
函数,实际上就做两个事情:
func main() {
command := app.NewControllerManagerCommand() // 初始化
code := cli.Run(command) //真正执行
os.Exit(code)
}
通过函数调用关系,我们进入 cmd/kube-controller-manager/controllermanager.go
中,查看 Run
执行究竟做了啥。
...
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
err := checkNonZeroInsecurePort(cmd.Flags())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
// 这里是我们需要关注的点,就是 `KnownControllers() 实际上就是将我们进行需要我们初始化已知的 `Controllers`
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
...
深入 KnowControllers()
函数分析:
// KnownControllers returns all known controllers's name
// 这里我们发现实际上这边就是之前 `daemon` 进程启动需要的参数,为一个 `controller` 控制器数组。
func KnownControllers() []string {
ret := sets.StringKeySet(NewControllerInitializers(IncludeCloudLoops))
// add "special" controllers that aren't initialized normally. These controllers cannot be initialized
// using a normal function. The only known special case is the SA token controller which *must* be started
// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding
// to this list.
ret.Insert(
saTokenControllerName,
)
return ret.List()
}
通过 NewControllerInitializers
可以知道的是真正执行 controller-manager
初始化的执行函数是这个。
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func)
// paired to their InitFunc. This allows for structured downstream composition and subdivision.
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
// 此处即为在 `map` 中进行实质性的初始化赋值
controllers["statefulset"] = startStatefulSetController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
// TODO: volume controller into the IncludeCloudLoops only set.
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
controllers["storage-version-gc"] = startStorageVersionGCController
}
return controllers
}
通过如上可以得知 StatefulSet
是如何被初始化到 kube-controller-manager
中的。
Statefulset Controller
启动过程
通过 cmd/kube-manager-controller/app/controllermanager.go
中 Run
函数分析,其中
...
Run: func(cmd *cobra.Command, args []string) {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
err := checkNonZeroInsecurePort(cmd.Flags())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
// 此处是对 `Controller manager` 进行 `config` 的初始化,然后对其中所管理的 `controller` 进行 `Run` 执行
if err := Run(c.Complete(), wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
},
...
进入 Run
函数进行分析:
// Run runs the KubeControllerManagerOptions. This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
...
clientBuilder, rootClientBuilder := createClientBuilders(c)
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) { // 此处是我们需要关注的 `Controller` 初始化。
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
controllerInitializers := initializersFunc(controllerContext.LoopMode)
// 在这里是真正意义上开始对管理的控制器执行启动
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
select {}
}
...
select {}
}
查看 StartControllers
函数:
// StartControllers starts a set of controllers with a specified ControllerContext
func StartControllers(ctx context.Context, controllerCtx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc,
unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
...
// 这里会 `for` 循环遍历初始化过的 `controllers` 进行处理,需要关注下 `initFn` 究竟做了啥.
for controllerName, initFn := range controllers {
if !controllerCtx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
ctrl, started, err := initFn(ctx, controllerCtx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
...
klog.Infof("Started %q", controllerName)
}
healthzHandler.AddHealthChecker(controllerChecks...)
return nil
}
// 用于判断具体的 `controller` 是否满足接口需求来得到 `controller manager` 支持的特性
type InitFunc func(ctx context.Context, controllerCtx ControllerContext) (controller controller.Interface, enabled bool, err error)
看到这里我们似乎还是没看到 StatefulSet Controller
真正执行的地方,请再次回顾下我们之前 NewControllerInitializers
中的内容:
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
...
controllers["statefulset"] = startStatefulSetController
...
}
感觉看到了一丝丝曙光,继续往下看 startStatefulSetController
的具体实现。
func startStatefulSetController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
go statefulset.NewStatefulSetController(
// 如下是 `Sts` 直接相关类型
controllerContext.InformerFactory.Core().V1().Pods(),
controllerContext.InformerFactory.Apps().V1().StatefulSets(),
controllerContext.InformerFactory.Core().V1().PersistentVolumeClaims(),
controllerContext.InformerFactory.Apps().V1().ControllerRevisions(),
controllerContext.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(int(controllerContext.ComponentConfig.StatefulSetController.ConcurrentStatefulSetSyncs), ctx.Done())
return nil, true, nil
}
看到这里我们终于知道了 StatefulSet Controller
真正意义上是被如何启动的了。
StatefulSet Controller
明细分析
通过如上的分析,下面就到了 StatefulSet Controller
具体的范畴了。
StatefulSetController
type StatefulSetController struct {
// client interface
kubeClient clientset.Interface
// control returns an interface capable of syncing a stateful set.
// Abstracted out for testing.
control StatefulSetControlInterface
// podControl is used for patching pods.
podControl controller.PodControlInterface
// podLister is able to list/get pods from a shared informer's store
podLister corelisters.PodLister
// podListerSynced returns true if the pod shared informer has synced at least once
podListerSynced cache.InformerSynced
// setLister is able to list/get stateful sets from a shared informer's store
setLister appslisters.StatefulSetLister
// setListerSynced returns true if the stateful set shared informer has synced at least once
setListerSynced cache.InformerSynced
// pvcListerSynced returns true if the pvc shared informer has synced at least once
pvcListerSynced cache.InformerSynced
// revListerSynced returns true if the rev shared informer has synced at least once
revListerSynced cache.InformerSynced
// StatefulSets that need to be synced.
queue workqueue.RateLimitingInterface
}
通过对 StatefulSetController
结构体的大纲,了解下大概的结构:
NewStatefulSetController
对于 ssc
的构造函数分析:
func NewStatefulSetController(
// 可以观察到,这边都是 `ssc` 关心的 `resource` 对象,Pod/Sts/Pvc/Revision
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
revInformer appsinformers.ControllerRevisionInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "statefulset-controller"})
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(
NewRealStatefulPodControl(
kubeClient,
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder),
NewRealStatefulSetStatusUpdater(kubeClient, setInformer.Lister()),
history.NewHistory(kubeClient, revInformer.Lister()),
recorder,
),
pvcListerSynced: pvcInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
revListerSynced: revInformer.Informer().HasSynced,
}
// `Sts` 管理的 `Pod crud` 时对应的处理方法
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// lookup the statefulset and enqueue
AddFunc: ssc.addPod,
// lookup current and old statefulset if labels changed
UpdateFunc: ssc.updatePod,
// lookup statefulset accounting for deletion tombstones
DeleteFunc: ssc.deletePod,
})
ssc.podLister = podInformer.Lister()
ssc.podListerSynced = podInformer.Informer().HasSynced
// `Sts crud` 时对应的方法
setInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ssc.enqueueStatefulSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.StatefulSet)
curPS := cur.(*apps.StatefulSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
klog.V(4).Infof("Observed updated replica count for StatefulSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
}
ssc.enqueueStatefulSet(cur)
},
DeleteFunc: ssc.enqueueStatefulSet,
},
)
ssc.setLister = setInformer.Lister()
ssc.setListerSynced = setInformer.Informer().HasSynced
// TODO: Watch volumes
return ssc
}
ControllerRevision
这里了解下 ControllerRevision
究竟是啥,为啥需要关注。
// ControllerRevision implements an immutable snapshot of state data. Clients
// are responsible for serializing and deserializing the objects that contain
// their internal state.
// Once a ControllerRevision has been successfully created, it can not be updated.
// The API Server will fail validation of all requests that attempt to mutate
// the Data field. ControllerRevisions may, however, be deleted. Note that, due to its use by both
// the DaemonSet and StatefulSet controllers for update and rollback, this object is beta. However,
// it may be subject to name and representation changes in future releases, and clients should not
// depend on its stability. It is primarily for internal use by controllers.
type ControllerRevision struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
// Data is the serialized representation of the state.
Data runtime.RawExtension `json:"data,omitempty" protobuf:"bytes,2,opt,name=data"`
// Revision indicates the revision of the state represented by Data.
Revision int64 `json:"revision" protobuf:"varint,3,opt,name=revision"`
}
由对应的注释可以知道:
ControllerRevision
提供给 DaemonSet
和 StatefulSet
用作更新和回滚,ControllerRevision
存放的是数据的快照,ControllerRevision
生成之后内容是不可修改的,由调用端来负责序列化写入和反序列化读取。其中 Revision(int64)
字段相当于 ControllerRevision
的版本 id
号,Data字段则存放序列化后的数据。
所以 Sts
的更新以及回滚是基于新旧 ControllerRevision
的对比来进行的。
NewDefaultStatefulSetControl
深入看下 NewDefaultStatefulSetControl
定义:
// NewDefaultStatefulSetControl returns a new instance of the default implementation StatefulSetControlInterface that
// implements the documented semantics for StatefulSets. podControl is the PodControlInterface used to create, update,
// and delete Pods and to create PersistentVolumeClaims. statusUpdater is the StatefulSetStatusUpdaterInterface used
// to update the status of StatefulSets. You should use an instance returned from NewRealStatefulPodControl() for any
// scenario other than testing.
func NewDefaultStatefulSetControl(
// 管理 Sts 对应的 Pod 的接口
podControl StatefulPodControlInterface,
// 管理 Sts 的 Status 的更新接口
statusUpdater StatefulSetStatusUpdaterInterface,
// 管理 ControllerRevision 的接口
controllerHistory history.Interface,
// 事件记录器接口
recorder record.EventRecorder) StatefulSetControlInterface {
return &defaultStatefulSetControl{podControl, statusUpdater, controllerHistory, recorder}
}
Run
函数执行过程
// Run runs the statefulset controller.
func (ssc *StatefulSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ssc.queue.ShutDown()
klog.Infof("Starting stateful set controller")
defer klog.Infof("Shutting down statefulset controller")
if !cache.WaitForNamedCacheSync("stateful set", stopCh, ssc.podListerSynced, ssc.setListerSynced, ssc.pvcListerSynced, ssc.revListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ssc.worker, time.Second, stopCh)
}
<-stopCh
}
此处关注下 wait.Until
工具:
// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
JitterUntil(f, period, 0.0, true, stopCh)
}
通过注释可以知道, Until
工具会根据 channel
的关闭来周期性的执行函数 f
。
主要解决的是当我们执行完某些操作后,还需要等待其他资源执行的情况,例如对于有依赖条件的资源释放的时候,A
依赖于 B
,那么对 A
资源释放的时候还需要对 B
资源的释放进行观望。这在 k8s
的资源操作场景是常见的。
继续关注 wait.Until
中包的函数 ssc.worker
。
// worker runs a worker goroutine that invokes processNextWorkItem until the controller's queue is closed
func (ssc *StatefulSetController) worker() {
for ssc.processNextWorkItem() {
}
}
worker
通过运行一个 goroutine
来处理 processNextWorkItem
直到 controller
相关的 queue
被关闭。
毫无疑问,需要分析 processNextWorkItem()
对应函数:
// processNextWorkItem dequeues items, processes them, and marks them done. It enforces that the syncHandler is never
// invoked concurrently with the same key.
func (ssc *StatefulSetController) processNextWorkItem() bool {
key, quit := ssc.queue.Get()
if quit {
return false
}
defer ssc.queue.Done(key)
// 其它语义很容易理解,需要关注的是 ssc.sync() 函数
if err := ssc.sync(key.(string)); err != nil {
utilruntime.HandleError(fmt.Errorf("error syncing StatefulSet %v, requeuing: %v", key.(string), err))
ssc.queue.AddRateLimited(key)
} else {
ssc.queue.Forget(key)
}
return true
}
processNextWorkItem()
主要用于对 queue
的元素出队,并标记为已处理。
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing statefulset %q (%v)", key, time.Since(startTime))
}()
// 对缓存中的 key 进行 split操作
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 对缓存根据 namespace 及 name 进行 get 操作
set, err := ssc.setLister.StatefulSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.Infof("StatefulSet has been deleted %v", key)
return nil
}
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to retrieve StatefulSet %v from store: %v", key, err))
return err
}
// 获取 sts 的 selector
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting StatefulSet %v selector: %v", key, err))
// This is a non-transient error, so don't retry.
return nil
}
// 调用 ssc.adoptOrphanRevisions 检查是否有孤儿 controllerrevisions 对象,若有且能匹配 selector 的则添加 ownerReferences 进行关联,已关联但 label 不匹配的则进行释放。
if err := ssc.adoptOrphanRevisions(set); err != nil {
return err
}
// 调用 ssc.getPodsForStatefulSet 通过 selector 获取 sts 关联的 pod,若有孤儿 pod 的 label 与 sts 的能匹配则进行关联,若已关联的 pod label 有变化则解除与 sts 的关联关系。
pods, err := ssc.getPodsForStatefulSet(set, selector)
if err != nil {
return err
}
// 执行真正的 sync 操作
return ssc.syncStatefulSet(set, pods)
}
syncStatefulSet
:
// syncStatefulSet syncs a tuple of (statefulset, []*v1.Pod).
func (ssc *StatefulSetController) syncStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) error {
klog.V(4).Infof("Syncing StatefulSet %v/%v with %d pods", set.Namespace, set.Name, len(pods))
var status *apps.StatefulSetStatus
var err error
// TODO: investigate where we mutate the set during the update as it is not obvious.
// 中仅仅是调用了 ssc.control.UpdateStatefulSet 方法进行处理。
status, err = ssc.control.UpdateStatefulSet(set.DeepCopy(), pods)
if err != nil {
return err
}
klog.V(4).Infof("Successfully synced StatefulSet %s/%s successful", set.Namespace, set.Name)
// One more sync to handle the clock skew. This is also helping in requeuing right after status update
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
ssc.enqueueSSAfter(set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}
return nil
}
UpdateStatefulSet
:
-
获取历史
revisions
; -
计算
currentRevision
和updateRevision
,若sts
处于更新过程中则currentRevision
和updateRevision
值不同; -
调用
ssc.performUpdate
执行实际的sync
操作; -
调用
ssc.updateStatefulSetStatus
更新status subResource
; -
根据
sts
的spec.revisionHistoryLimit
字段清理过期的controllerrevision
;
func (ssc *defaultStatefulSetControl) UpdateStatefulSet(set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
// 获取 revisions 并排序
revisions, err := ssc.ListRevisions(set)
if err != nil {
return nil, err
}
history.SortControllerRevisions(revisions)
// 计算 Revison
currentRevision, updateRevision, status, err := ssc.performUpdate(set, pods, revisions)
if err != nil {
return nil, utilerrors.NewAggregate([]error{err, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)})
}
// 清除过期的历史版本
return status, ssc.truncateHistory(set, pods, revisions, currentRevision, updateRevision)
}
func (ssc *defaultStatefulSetControl) performUpdate(
set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
// get the current, and update revisions
currentRevision, updateRevision, collisionCount, err := ssc.getStatefulSetRevisions(set, revisions)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
// 实现具体的 update 操作
currentStatus, err = ssc.updateStatefulSet(set, currentRevision, updateRevision, collisionCount, pods)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
// update status
err = ssc.updateStatefulSetStatus(set, currentStatus)
if err != nil {
return currentRevision, updateRevision, currentStatus, err
}
klog.V(4).Infof("StatefulSet %s/%s pod status replicas=%d ready=%d current=%d updated=%d",
set.Namespace,
set.Name,
currentStatus.Replicas,
currentStatus.ReadyReplicas,
currentStatus.CurrentReplicas,
currentStatus.UpdatedReplicas)
klog.V(4).Infof("StatefulSet %s/%s revisions current=%s update=%s",
set.Namespace,
set.Name,
currentStatus.CurrentRevision,
currentStatus.UpdateRevision)
return currentRevision, updateRevision, currentStatus, nil
}
updateStatefulSet
:
// 作为updateStatefulSet的核心方法,重试保障Statefulset到达期望状态,update策略主要分为三类:
// 1.RollingUpdateStatefulSetStrategyType
// 2.OnDeleteStatefulSetStrategyType
// 3.PartitionStatefulSetStrategyType
func (ssc *defaultStatefulSetControl) updateStatefulSet(
set *apps.StatefulSet,
currentRevision *apps.ControllerRevision,
updateRevision *apps.ControllerRevision,
collisionCount int32,
pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
...
// 获取当前和更新的 Revision
currentSet, err := ApplyRevision(set, currentRevision)
...
// 构建 sts 对象
currentSet, err := ApplyRevision(set, currentRevision)
...
// 构建 sts 对象
updateSet, err := ApplyRevision(set, updateRevision)
...
// status 赋值
status := apps.StatefulSetStatus{}
status.ObservedGeneration = set.Generation
status.CurrentRevision = currentRevision.Name
status.UpdateRevision = updateRevision.Name
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
// replicas 存放 Pods such that 0 <= getOrdinal(pod) < set.Spec.Replicas
replicas := make([]*v1.Pod, replicaCount)
// condemned 存放 Pods such that set.Spec.Replicas <= getOrdinal(pod)
condemned := make([]*v1.Pod, 0, len(pods))
...
// 对 pods 进行处理分别存放到 replicas 和 condemned 切片中
for i := range pods {
status.Replicas++
// 统计 running 和 ready 的副本数
if isRunningAndReady(pods[i]) {
status.ReadyReplicas++
// 对门控是否开启特性的判断
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) {
if isRunningAndAvailable(pods[i], set.Spec.MinReadySeconds) {
status.AvailableReplicas++
}
} else {
// 如果门控特性未开启,那么所有ready 的副本数将被认为是可用状态的副本数
status.AvailableReplicas = status.ReadyReplicas
}
}
// 统计 current 和 update 的副本数
if isCreated(pods[i]) && !isTerminating(pods[i]) {
if getPodRevision(pods[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(pods[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
}
if ord := getOrdinal(pods[i]); 0 <= ord && ord < replicaCount {
// replicas 的赋值
replicas[ord] = pods[i]
} else if ord >= replicaCount {
// condemned 的赋值
condemned = append(condemned, pods[i])
}
}
// 检查 replicas数组中 [0,set.Spec.Replicas) 下标是否有缺失的 pod,若有缺失的则创建对应的 pod object
// 在 newVersionedStatefulSetPod 中会判断是使用 currentSet 还是 updateSet 来创建
for ord := 0; ord < replicaCount; ord++ {
if replicas[ord] == nil {
replicas[ord] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name, ord)
}
}
// 对 condemned 数组进行排序
sort.Sort(ascendingOrdinal(condemned))
// 根据 ord 在 replicas 和 condemned 数组中找出 first unhealthy Pod
for i := range replicas {
if !isHealthy(replicas[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = replicas[i]
}
}
}
for i := range condemned {
if !isHealthy(condemned[i]) {
unhealthy++
if firstUnhealthyPod == nil {
firstUnhealthyPod = condemned[i]
}
}
}
if unhealthy > 0 {
klog.V(4).Infof("StatefulSet %s/%s has %d unhealthy Pods starting with %s",
set.Namespace,
set.Name,
unhealthy,
firstUnhealthyPod.Name)
}
// 判断 set 是否处于 deleting
if set.DeletionTimestamp != nil {
return &status, nil
}
// 默认设置为非并行模式
monotonic := !allowsBurst(set)
// 确保 replicas 数组中的所有 pod 都是 running 状态
for i := range replicas {
// 删除和重建失败的 pods
if isFailed(replicas[i]) {
ssc.recorder.Eventf(set, v1.EventTypeWarning, "RecreatingFailedPod",
"StatefulSet %s/%s is recreating failed Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.DeleteStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas--
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas--
}
status.Replicas--
replicas[i] = newVersionedStatefulSetPod(
currentSet,
updateSet,
currentRevision.Name,
updateRevision.Name,
i)
}
// 如果 pod 未被创建则进行创建
if !isCreated(replicas[i]) {
if err := ssc.podControl.CreateStatefulPod(set, replicas[i]); err != nil {
return &status, err
}
status.Replicas++
if getPodRevision(replicas[i]) == currentRevision.Name {
status.CurrentReplicas++
}
if getPodRevision(replicas[i]) == updateRevision.Name {
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic {
return &status, nil
}
// pod created, no more work possible for this round
continue
}
// 当 pod 处于 terminating 状态的时候且不允许并行的情况下 则进行等待删除完成
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 当 pod 已经被创建且不运行并行的情况下,状态并不是 running 和 ready 状态的处理。
if !isRunningAndReady(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Running and Ready",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// pod creates 成功但是并不是可用状态时的处理。
if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetMinReadySeconds) && !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to be Available",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
}
// 对 sts 的唯一性及相关存储唯一性的检查
if identityMatches(set, replicas[i]) && storageMatches(set, replicas[i]) {
continue
}
...
}
}
至此结束。