k8s replicaset controller源码分析(2)-核心处理逻辑分析
replicaset controller分析
replicaset controller簡介
replicaset controller是kube-controller-manager組件中眾多控制器中的一個,是 replicaset 資源對象的控制器,其通過對replicaset、pod 2種資源的監聽,當這2種資源發生變化時會觸發 replicaset controller 對相應的replicaset對象進行調諧操作,從而完成replicaset期望副本數的調諧,當實際pod的數量未達到預期時創建pod,當實際pod的數量超過預期時刪除pod。
replicaset controller主要作用是根據replicaset對象所期望的pod數量與現存pod數量做比較,然后根據比較結果創建/刪除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等。
replicaset controller架構圖
replicaset controller的大致組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然后將對應的replicaset對象放入到queue中,然后syncReplicaSet方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。
replicaset controller分析分為3大塊進行,分別是:
(1)replicaset controller初始化和啟動分析;
(2)replicaset controller核心處理邏輯分析;
(3)replicaset controller expectations機制分析。
本篇博客進行replicaset controller核心處理邏輯分析。
replicaset controller核心處理邏輯分析
基于v1.17.4
經過前面分析的replicaset controller的初始化與啟動,知道了replicaset controller監聽replicaset、pod對象的add、update與delete事件,然后對replicaset對象做相應的調諧處理,這里來接著分析replicaset controller的調諧處理(核心處理)邏輯,從rsc.syncHandler作為入口進行分析。
rsc.syncHandler
rsc.syncHandler即rsc.syncReplicaSet方法,主要邏輯:
(1)獲取replicaset對象以及關聯的pod對象列表;
(2)調用rsc.expectations.SatisfiedExpectations,判斷上一輪對replicaset期望副本的創刪操作是否完成,也可以認為是判斷上一次對replicaset對象的調諧操作中,調用的rsc.manageReplicas方法是否執行完成;
(3)如果上一輪對replicaset期望副本的創刪操作已經完成,且replicaset對象的DeletionTimestamp字段為nil,則調用rsc.manageReplicas做replicaset期望副本的核心調諧處理,即創刪pod;
(4)調用calculateStatus計算replicaset的status,并更新。
1 rsc.expectations.SatisfiedExpectations
該方法主要是判斷上一輪對replicaset期望副本的創刪操作是否完成,也可以認為是判斷上一次對replicaset對象的調諧操作中,調用的rsc.manageReplicas方法是否執行完成。待上一次創建刪除pod的操作完成后,才能進行下一次的rsc.manageReplicas方法調用。
若某replicaset對象的調諧中從未調用過rsc.manageReplicas方法,或上一輪調諧時創建/刪除pod的數量已達成或調用rsc.manageReplicas后已達到超時期限(超時時間5分鐘),則返回true,代表上一次創建刪除pod的操作完成,可以進行下一次的rsc.manageReplicas方法調用,否則返回false。
expectations記錄了replicaset對象在某一次調諧中期望創建/刪除的pod數量,pod創建/刪除完成后,該期望數會相應的減少,當期望創建/刪除的pod數量小于等于0時,說明上一次調諧中期望創建/刪除的pod數量已經達到,返回true。
關于Expectations機制后面會做詳細分析。
// pkg/controller/controller_utils.go func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {if exp, exists, err := r.GetExpectations(controllerKey); exists {if exp.Fulfilled() {klog.V(4).Infof("Controller expectations fulfilled %#v", exp)return true} else if exp.isExpired() {klog.V(4).Infof("Controller expectations expired %#v", exp)return true} else {klog.V(4).Infof("Controller still waiting on expectations %#v", exp)return false}} else if err != nil {klog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err)} else {// When a new controller is created, it doesn't have expectations.// When it doesn't see expected watch events for > TTL, the expectations expire.// - In this case it wakes up, creates/deletes controllees, and sets expectations again.// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.// - In this case it continues without setting expectations till it needs to create/delete controllees.klog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)}// Trigger a sync if we either encountered and error (which shouldn't happen since we're// getting from local store) or this controller hasn't established expectations.return true }func (exp *ControlleeExpectations) isExpired() bool {return clock.RealClock{}.Since(exp.timestamp) > ExpectationsTimeout // ExpectationsTimeout = 5 * time.Minute }2 核心創建刪除pod方法-rsc.manageReplicas
核心創建刪除pod方法,主要是根據replicaset所期望的pod數量與現存pod數量做比較,然后根據比較結果來創建/刪除pod,最終使得replicaset對象所期望的pod數量與現存pod數量相等,需要特別注意的是,每一次調用rsc.manageReplicas方法,創建/刪除pod的個數上限為500。
在replicaset對象的調諧中,rsc.manageReplicas方法不一定每一次都會調用執行,只有當rsc.expectations.SatisfiedExpectations方法返回true,且replicaset對象的DeletionTimestamp屬性為空時,才會進行rsc.manageReplicas方法的調用。
先簡單的看一下代碼,代碼后面會做詳細的邏輯分析。
// pkg/controller/replicaset/replica_set.go func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {diff := len(filteredPods) - int(*(rs.Spec.Replicas))rsKey, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))return nil}if diff < 0 {diff *= -1if diff > rsc.burstReplicas {diff = rsc.burstReplicas}// TODO: Track UIDs of creates just like deletes. The problem currently// is we'd need to wait on the result of a create to record the pod's// UID, which would require locking *across* the create, which will turn// into a performance bottleneck. We should generate a UID for the pod// beforehand and store it via ExpectCreations.rsc.expectations.ExpectCreations(rsKey, diff)glog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize// and double with each successful iteration in a kind of "slow start".// This handles attempts to start large numbers of pods that would// likely all fail with the same error. For example a project with a// low quota that attempts to create a large number of pods will be// prevented from spamming the API service with the pod create requests// after one of its pods fails. Conveniently, this also prevents the// event spam that those failures would generate.successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {boolPtr := func(b bool) *bool { return &b }controllerRef := &metav1.OwnerReference{APIVersion: rsc.GroupVersion().String(),Kind: rsc.Kind,Name: rs.Name,UID: rs.UID,BlockOwnerDeletion: boolPtr(true),Controller: boolPtr(true),}err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)if err != nil && errors.IsTimeout(err) {// Pod is created but its initialization has timed out.// If the initialization is successful eventually, the// controller will observe the creation via the informer.// If the initialization fails, or if the pod keeps// uninitialized for a long time, the informer will not// receive any update, and the controller will create a new// pod when the expectation expires.return nil}return err})// Any skipped pods that we never attempted to start shouldn't be expected.// The skipped pods will be retried later. The next controller resync will// retry the slow start process.if skippedPods := diff - successfulCreations; skippedPods > 0 {glog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)for i := 0; i < skippedPods; i++ {// Decrement the expected number of creates because the informer won't observe this podrsc.expectations.CreationObserved(rsKey)}}return err} else if diff > 0 {if diff > rsc.burstReplicas {diff = rsc.burstReplicas}glog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)// Choose which Pods to delete, preferring those in earlier phases of startup.podsToDelete := getPodsToDelete(filteredPods, diff)// Snapshot the UIDs (ns/name) of the pods we're expecting to see// deleted, so we know to record their expectations exactly once either// when we see it as an update of the deletion timestamp, or as a delete.// Note that if the labels on a pod/rs change in a way that the pod gets// orphaned, the rs will only wake up after the expectations have// expired even if other pods are deleted.rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))errCh := make(chan error, diff)var wg sync.WaitGroupwg.Add(diff)for _, pod := range podsToDelete {go func(targetPod *v1.Pod) {defer wg.Done()if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {// Decrement the expected number of deletes because the informer won't observe this deletionpodKey := controller.PodKey(targetPod)glog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)rsc.expectations.DeletionObserved(rsKey, podKey)errCh <- err}}(pod)}wg.Wait()select {case err := <-errCh:// all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.if err != nil {return err}default:}}return nil }diff = 現存pod數量 - 期望的pod數量
diff := len(filteredPods) - int(*(rs.Spec.Replicas))(1)當現存pod數量比期望的少時,需要創建pod,進入創建pod的邏輯代碼塊。
(2)當現存pod數量比期望的多時,需要刪除pod,進入刪除pod的邏輯代碼塊。
一次同步操作中批量創建或刪除pod的個數上限為rsc.burstReplicas,即500個。
// pkg/controller/replicaset/replica_set.go const (// Realistic value of the burstReplica field for the replica set manager based off// performance requirements for kubernetes 1.0.BurstReplicas = 500// The number of times we retry updating a ReplicaSet's status.statusUpdateRetries = 1 ) if diff > rsc.burstReplicas {diff = rsc.burstReplicas}接下來分析一下創建/刪除pod的邏輯代碼塊。
2.1 創建pod邏輯代碼塊
主要邏輯:
(1)運算獲取需要創建的pod數量,并設置數量上限500;
(2)調用rsc.expectations.ExpectCreations,將本輪調諧期望創建的pod數量設置進expectations;
(3)調用slowStartBatch函數來對pod進行創建邏輯處理;
(4)調用slowStartBatch函數完成后,計算獲取創建失敗的pod的數量,然后調用相應次數的rsc.expectations.CreationObserved方法,減去本輪調諧中期望創建的pod數量。
為什么要減呢?因為expectations記錄了replicaset對象在某一次調諧中期望創建/刪除的pod數量,pod創建/刪除完成后,replicaset controller會watch到pod的創建/刪除事件,從而調用rsc.expectations.CreationObserved方法來使期望創建/刪除的pod數量減少。當有相應數量的pod創建/刪除失敗后,replicaset controller是不會watch到相應的pod創建/刪除事件的,所以必須把本輪調諧期望創建/刪除的pod數量做相應的減法,否則本輪調諧中的期望創建/刪除pod數量永遠不可能小于等于0,這樣的話,rsc.expectations.SatisfiedExpectations方法就只會等待expectations超時期限到達才會返回true了。
2.1.1 slowStartBatch
來看到slowStartBatch,可以看到創建pod的算法為:
(1)每次批量創建的 pod 數依次為 1、2、4、8…,呈指數級增長,起與要創建的pod數量相同的goroutine來負責創建pod。
(2)創建pod按1、2、4、8…的遞增趨勢分多批次進行,若某批次創建pod有失敗的(如apiserver限流,丟棄請求等,注意:超時除外,因為initialization處理有可能超時),則后續批次不再進行,結束本次函數調用。
rsc.podControl.CreatePodsWithControllerRef
前面定義的創建pod時調用的方法為rsc.podControl.CreatePodsWithControllerRef。
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {if err := validateControllerRef(controllerRef); err != nil {return err}return r.createPods("", namespace, template, controllerObject, controllerRef) }func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {pod, err := GetPodFromTemplate(template, object, controllerRef)if err != nil {return err}if len(nodeName) != 0 {pod.Spec.NodeName = nodeName}if len(labels.Set(pod.Labels)) == 0 {return fmt.Errorf("unable to create pods, no labels")}newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(pod)if err != nil {// only send an event if the namespace isn't terminatingif !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {r.Recorder.Eventf(object, v1.EventTypeWarning, FailedCreatePodReason, "Error creating: %v", err)}return err}accessor, err := meta.Accessor(object)if err != nil {klog.Errorf("parentObject does not have ObjectMeta, %v", err)return nil}klog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulCreatePodReason, "Created pod: %v", newPod.Name)return nil }2.2 刪除邏輯代碼塊
主要邏輯:
(1)運算獲取需要刪除的pod數量,并設置數量上限500;
(2)根據要縮容刪除的pod數量,先調用getPodsToDelete函數找出需要刪除的pod列表;
(3)調用rsc.expectations.ExpectCreations,將本輪調諧期望刪除的pod數量設置進expectations;
(4)每個pod拉起一個goroutine,調用rsc.podControl.DeletePod來刪除該pod;
(5)對于刪除失敗的pod,會調用rsc.expectations.DeletionObserved方法,減去本輪調諧中期望創建的pod數量。
至于為什么要減,原因跟上面創建邏輯代碼塊中分析的一樣。
(6)等待所有gorouutine完成,return返回。
2.2.1 getPodsToDelete
getPodsToDelete:根據要縮容刪除的pod數量,然后返回需要刪除的pod列表。
// pkg/controller/replicaset/replica_set.go func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod {// No need to sort pods if we are about to delete all of them.// diff will always be <= len(filteredPods), so not need to handle > case.if diff < len(filteredPods) {podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods)sort.Sort(podsWithRanks)}return filteredPods[:diff] }func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks {podsOnNode := make(map[string]int)for _, pod := range relatedPods {if controller.IsPodActive(pod) {podsOnNode[pod.Spec.NodeName]++}}ranks := make([]int, len(podsToRank))for i, pod := range podsToRank {ranks[i] = podsOnNode[pod.Spec.NodeName]}return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks} }篩選要刪除的pod邏輯
按照下面的排序規則,從上到下進行排序,各個條件相互互斥,符合其中一個條件則排序完成:
(1)優先刪除沒有綁定node的pod;
(2)優先刪除處于Pending狀態的pod,然后是Unknown,最后才是Running;
(3)優先刪除Not ready的pod,然后才是ready的pod;
(4)按同node上所屬replicaset的pod數量排序,優先刪除所屬replicaset的pod數量多的node上的pod;
(5)按pod ready的時間排序,優先刪除ready時間最短的pod;
(6)優先刪除pod中容器重啟次數較多的pod;
(7)按pod創建時間排序,優先刪除創建時間最短的pod。
2.2.2 rsc.podControl.DeletePod
刪除pod的方法。
// pkg/controller/controller_utils.go func (r RealPodControl) DeletePod(namespace string, podID string, object runtime.Object) error {accessor, err := meta.Accessor(object)if err != nil {return fmt.Errorf("object does not have ObjectMeta, %v", err)}klog.V(2).Infof("Controller %v deleting pod %v/%v", accessor.GetName(), namespace, podID)if err := r.KubeClient.CoreV1().Pods(namespace).Delete(podID, nil); err != nil && !apierrors.IsNotFound(err) {r.Recorder.Eventf(object, v1.EventTypeWarning, FailedDeletePodReason, "Error deleting: %v", err)return fmt.Errorf("unable to delete pods: %v", err)}r.Recorder.Eventf(object, v1.EventTypeNormal, SuccessfulDeletePodReason, "Deleted pod: %v", podID)return nil }3 calculateStatus
calculateStatus函數計算并返回replicaset對象的status。
怎么計算status呢?
(1)根據現存pod數量、Ready狀態的pod數量、availabel狀態的pod數量等,給replicaset對象的status的Replicas、ReadyReplicas、AvailableReplicas等字段賦值;
(2)根據replicaset對象現有status中的condition配置以及前面調用rsc.manageReplicas方法后是否有錯誤,來決定給status新增condition或移除condition,conditionType為ReplicaFailure。
當調用rsc.manageReplicas方法出錯,且replicaset對象的status中,沒有conditionType為ReplicaFailure的condition,則新增conditionType為ReplicaFailure的condition,表示該replicaset創建/刪除pod出錯;
當調用rsc.manageReplicas方法沒有任何錯誤,且replicaset對象的status中,有conditionType為ReplicaFailure的condition,則去除該condition,表示該replicaset創建/刪除pod成功。
4 updateReplicaSetStatus
主要邏輯:
(1)判斷新計算出來的status中的各個屬性如Replicas、ReadyReplicas、AvailableReplicas以及Conditions是否與現存replicaset對象的status中的一致,一致則不用做更新操作,直接return;
(2)調用c.UpdateStatus更新replicaset的status。
c.UpdateStatus
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/replicaset.go func (c *replicaSets) UpdateStatus(replicaSet *v1.ReplicaSet) (result *v1.ReplicaSet, err error) {result = &v1.ReplicaSet{}err = c.client.Put().Namespace(c.ns).Resource("replicasets").Name(replicaSet.Name).SubResource("status").Body(replicaSet).Do().Into(result)return }總結
replicaset controller架構圖
replicaset controller的大致組成和處理流程如下圖,replicaset controller對pod和replicaset對象注冊了event handler,當有事件時,會watch到然后將對應的replicaset對象放入到queue中,然后syncReplicaSet方法為replicaset controller調諧replicaset對象的核心處理邏輯所在,從queue中取出replicaset對象,做調諧處理。
replicaset controller核心處理邏輯
replicaset controller的核心處理邏輯是根據replicaset對象里期望的pod數量以及現存pod數量的比較,當期望pod數量比現存pod數量多時,調用創建pod算法創建出新的pod,直至達到期望數量;當期望pod數量比現存pod數量少時,調用刪除pod算法,并根據一定的策略對現存pod列表做排序,從中按順序選擇多余的pod然后刪除,直至達到期望數量。
replicaset controller創建pod算法
replicaset controller創建pod的算法是,按1、2、4、8…的遞增趨勢分多批次進行(每次調諧中創建pod的數量上限為500個,超過上限的會在下次調諧中再創建),若某批次創建pod有失敗的(如apiserver限流,丟棄請求等,注意:超時除外,因為initialization處理有可能超時),則后續批次的pod創建不再進行,需等待該repliaset對象下次調諧時再觸發該pod創建算法,進行pod的創建,直至達到期望數量。
replicaset controller刪除pod算法
replicaset controller刪除pod的算法是,先根據一定的策略將現存pod列表做排序,然后按順序從中選擇指定數量的pod,拉起與要刪除的pod數量相同的goroutine來刪除pod(每次調諧中刪除pod的數量上限為500個),并等待所有goroutine執行完成。刪除pod有失敗的(如apiserver限流,丟棄請求)或超過500上限的部分,需等待該repliaset對象下次調諧時再觸發該pod刪除算法,進行pod的刪除,直至達到期望數量。
篩選要刪除的pod邏輯
按照下面的排序規則,從上到下進行排序,各個條件相互互斥,符合其中一個條件則排序完成:
(1)優先刪除沒有綁定node的pod;
(2)優先刪除處于Pending狀態的pod,然后是Unknown,最后才是Running;
(3)優先刪除Not ready的pod,然后才是ready的pod;
(4)按同node上所屬replicaset的pod數量排序,優先刪除所屬replicaset的pod數量多的node上的pod;
(5)按pod ready的時間排序,優先刪除ready時間最短的pod;
(6)優先刪除pod中容器重啟次數較多的pod;
(7)按pod創建時間排序,優先刪除創建時間最短的pod。
expectations機制
關于expectations機制的分析,會在下一篇博客中進行。
總結
以上是生活随笔為你收集整理的k8s replicaset controller源码分析(2)-核心处理逻辑分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux查看网络连接情况ss,Linu
- 下一篇: 气体压力测试与泄露量测试