Kube Controller Manager 源码分析
Kube Controller Manager 源碼分析
Controller Manager 在k8s 集群中扮演著中心管理的角色,它負責Deployment, StatefulSet, ReplicaSet 等資源的創建與管理,可以說是k8s的核心模塊,下面我們以概略的形式走讀一下k8s Controller Manager 代碼。
func NewControllerManagerCommand() *cobra.Command {s, err := options.NewKubeControllerManagerOptions()if err != nil {klog.Fatalf("unable to initialize command options: %v", err)}cmd := &cobra.Command{Use: "kube-controller-manager",Long: `The Kubernetes controller manager is a daemon that embeds the core control loops shipped with Kubernetes. In applications of robotics and automation, a control loop is a non-terminating loop that regulates the state of the system. In Kubernetes, a controller is a control loop that watches the shared state of the cluster through the apiserver and makes changes attempting to move the current state towards the desired state. Examples of controllers that ship with Kubernetes today are the replication controller, endpoints controller, namespace controller, and serviceaccounts controller.`,Run: func(cmd *cobra.Command, args []string) {verflag.PrintAndExitIfRequested()utilflag.PrintFlags(cmd.Flags())c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())if err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}if err := Run(c.Complete(), wait.NeverStop); err != nil {fmt.Fprintf(os.Stderr, "%v\n", err)os.Exit(1)}},}Controller Manager 也是一個命令行,通過一系列flag啟動,具體的各個flag 我們就不多看,有興趣的可以去文檔或者flags_opinion.go 文件里面去過濾一下,我們直接從Run 函數入手。
Run Function 啟動流程
Kube Controller Manager 既可以單實例啟動,也可以多實例啟動。 如果為了保證 HA 而啟動多個Controller Manager,它就需要選主來保證同一時間只有一個Master 實例。我們來看一眼Run 函數的啟動流程,這里會把一些不重要的細節函數略過,只看重點
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {run := func(ctx context.Context) {rootClientBuilder := controller.SimpleControllerClientBuilder{ClientConfig: c.Kubeconfig,}controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())if err != nil {klog.Fatalf("error building controller context: %v", err)}if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {klog.Fatalf("error starting controllers: %v", err)}controllerContext.InformerFactory.Start(controllerContext.Stop)close(controllerContext.InformersStarted)select {}}id, err := os.Hostname()if err != nil {return err}// add a uniquifier so that two processes on the same host don't accidentally both become activeid = id + "_" + string(uuid.NewUUID())rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,"kube-system","kube-controller-manager",c.LeaderElectionClient.CoreV1(),resourcelock.ResourceLockConfig{Identity: id,EventRecorder: c.EventRecorder,})if err != nil {klog.Fatalf("error creating lock: %v", err)}leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{Lock: rl,LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,Callbacks: leaderelection.LeaderCallbacks{OnStartedLeading: run,OnStoppedLeading: func() {klog.Fatalf("leaderelection lost")},},WatchDog: electionChecker,Name: "kube-controller-manager",})panic("unreachable") }這里的基本流程如下:
- 首先定義了run 函數,run 函數負責具體的controller 構建以及最終的controller 操作的執行
- 使用Client-go 提供的選主函數來進行選主
- 如果獲得主權限,那么就調用OnStartedLeading?注冊函數,也就是上面的run 函數來執行操作,如果沒選中,就hang住等待
選主流程解析
Client-go 選主工具類主要是通過kubeClient 在Configmap或者Endpoint選擇一個資源創建,然后哪一個goroutine 創建成功了資源,哪一個goroutine 獲得鎖,當然所有的鎖信息都會存在Configmap或者Endpoint里面。之所以選擇這兩個資源類型,主要是考慮他們被Watch的少,但是現在kube Controller Manager 還是適用的Endpoint,后面會逐漸遷移到ConfigMap,因為Endpoint會被kube-proxy?Ingress Controller等頻繁Watch,我們來看一眼集群內Endpoint內容
[root@iZ8vb5qgxqbxakfo1cuvpaZ ~]# kubectl get ep -n kube-system kube-controller-manager -o yaml apiVersion: v1 kind: Endpoints metadata:annotations:control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"iZ8vbccmhgkyfdi8aii1hnZ_d880fea6-1322-11e9-913f-00163e033b49","leaseDurationSeconds":15,"acquireTime":"2019-01-08T08:53:49Z","renewTime":"2019-01-22T11:16:59Z","leaderTransitions":1}'creationTimestamp: 2019-01-08T08:52:56Zname: kube-controller-managernamespace: kube-systemresourceVersion: "2978183"selfLink: /api/v1/namespaces/kube-system/endpoints/kube-controller-manageruid: cade1b65-1322-11e9-9931-00163e033b49可以看到,這里面涵蓋了當前Master ID,獲取Master的時間,更新頻率以及下一次更新時間。這一切最終還是靠ETCD 完成的選主。主要的選主代碼如下
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {switch lockType {case EndpointsResourceLock:return &EndpointsLock{EndpointsMeta: metav1.ObjectMeta{Namespace: ns,Name: name,},Client: client,LockConfig: rlc,}, nilcase ConfigMapsResourceLock:return &ConfigMapLock{ConfigMapMeta: metav1.ObjectMeta{Namespace: ns,Name: name,},Client: client,LockConfig: rlc,}, nildefault:return nil, fmt.Errorf("Invalid lock-type %s", lockType)} }StartController
選主完畢后,就需要真正啟動controller了,我們來看一下啟動controller 的代碼
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest// If this fails, just return here and fail since other controllers won't be able to get credentials.if _, _, err := startSATokenController(ctx); err != nil {return err}// Initialize the cloud provider with a reference to the clientBuilder only after token controller// has started in case the cloud provider uses the client builder.if ctx.Cloud != nil {ctx.Cloud.Initialize(ctx.ClientBuilder, ctx.Stop)}for controllerName, initFn := range controllers {if !ctx.IsControllerEnabled(controllerName) {klog.Warningf("%q is disabled", controllerName)continue}time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))klog.V(1).Infof("Starting %q", controllerName)debugHandler, started, err := initFn(ctx)if err != nil {klog.Errorf("Error starting %q", controllerName)return err}if !started {klog.Warningf("Skipping %q", controllerName)continue}if debugHandler != nil && unsecuredMux != nil {basePath := "/debug/controllers/" + controllerNameunsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))}klog.Infof("Started %q", controllerName)}return nil }- 遍歷所有的controller list
- 執行每個controller 的Init Function
那么一共有多少Controller 呢
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {controllers := map[string]InitFunc{}controllers["endpoint"] = startEndpointControllercontrollers["replicationcontroller"] = startReplicationControllercontrollers["podgc"] = startPodGCControllercontrollers["resourcequota"] = startResourceQuotaControllercontrollers["namespace"] = startNamespaceControllercontrollers["serviceaccount"] = startServiceAccountControllercontrollers["garbagecollector"] = startGarbageCollectorControllercontrollers["daemonset"] = startDaemonSetControllercontrollers["job"] = startJobControllercontrollers["deployment"] = startDeploymentControllercontrollers["replicaset"] = startReplicaSetControllercontrollers["horizontalpodautoscaling"] = startHPAControllercontrollers["disruption"] = startDisruptionControllercontrollers["statefulset"] = startStatefulSetControllercontrollers["cronjob"] = startCronJobControllercontrollers["csrsigning"] = startCSRSigningControllercontrollers["csrapproving"] = startCSRApprovingControllercontrollers["csrcleaner"] = startCSRCleanerControllercontrollers["ttl"] = startTTLControllercontrollers["bootstrapsigner"] = startBootstrapSignerControllercontrollers["tokencleaner"] = startTokenCleanerControllercontrollers["nodeipam"] = startNodeIpamControllercontrollers["nodelifecycle"] = startNodeLifecycleControllerif loopMode == IncludeCloudLoops {controllers["service"] = startServiceControllercontrollers["route"] = startRouteControllercontrollers["cloud-node-lifecycle"] = startCloudNodeLifecycleController// TODO: volume controller into the IncludeCloudLoops only set.}controllers["persistentvolume-binder"] = startPersistentVolumeBinderControllercontrollers["attachdetach"] = startAttachDetachControllercontrollers["persistentvolume-expander"] = startVolumeExpandControllercontrollers["clusterrole-aggregation"] = startClusterRoleAggregrationControllercontrollers["pvc-protection"] = startPVCProtectionControllercontrollers["pv-protection"] = startPVProtectionControllercontrollers["ttl-after-finished"] = startTTLAfterFinishedControllercontrollers["root-ca-cert-publisher"] = startRootCACertPublisherreturn controllers }答案就在這里,上面的代碼列出來了當前kube controller manager 所有的controller,既有大家熟悉的Deployment?StatefulSet?也有一些不熟悉的身影。下面我們以Deployment 為例看看它到底干了什么
Deployment Controller
先來看一眼Deployemnt Controller 啟動函數
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {return nil, false, nil}dc, err := deployment.NewDeploymentController(ctx.InformerFactory.Apps().V1().Deployments(),ctx.InformerFactory.Apps().V1().ReplicaSets(),ctx.InformerFactory.Core().V1().Pods(),ctx.ClientBuilder.ClientOrDie("deployment-controller"),)if err != nil {return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)}go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)return nil, true, nil }看到這里,如果看過上一篇針對Client-go Informer 文章的肯定不陌生,這里又使用了InformerFactory,而且是好幾個。其實kube Controller Manager 里面大量使用了Informer,Controller 就是使用 Informer 來通知和觀察所有的資源。可以看到,這里Deployment Controller 主要關注Deployment ReplicaSet Pod?這三個資源。
Deployment Controller 資源初始化
下面來看一下Deployemnt Controller 初始化需要的資源
// NewDeploymentController creates a new DeploymentController. func NewDeploymentController(dInformer appsinformers.DeploymentInformer, rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {eventBroadcaster := record.NewBroadcaster()eventBroadcaster.StartLogging(klog.Infof)eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {return nil, err}}dc := &DeploymentController{client: client,eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "deployment-controller"}),queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),}dc.rsControl = controller.RealRSControl{KubeClient: client,Recorder: dc.eventRecorder,}dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addDeployment,UpdateFunc: dc.updateDeployment,// This will enter the sync loop and no-op, because the deployment has been deleted from the store.DeleteFunc: dc.deleteDeployment,})rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: dc.addReplicaSet,UpdateFunc: dc.updateReplicaSet,DeleteFunc: dc.deleteReplicaSet,})podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: dc.deletePod,})dc.syncHandler = dc.syncDeploymentdc.enqueueDeployment = dc.enqueuedc.dLister = dInformer.Lister()dc.rsLister = rsInformer.Lister()dc.podLister = podInformer.Lister()dc.dListerSynced = dInformer.Informer().HasSynceddc.rsListerSynced = rsInformer.Informer().HasSynceddc.podListerSynced = podInformer.Informer().HasSyncedreturn dc, nil }是不是這里的代碼似曾相識,如果接觸過Client-go Informer 的代碼,可以看到這里如出一轍,基本上就是對創建的資源分別觸發對應的Add Update Delete?函數,同時所有的資源通過Lister獲得,不需要真正的Query APIServer。
先來看一下針對Deployment 的Handler
func (dc *DeploymentController) addDeployment(obj interface{}) {d := obj.(*apps.Deployment)klog.V(4).Infof("Adding deployment %s", d.Name)dc.enqueueDeployment(d) }func (dc *DeploymentController) updateDeployment(old, cur interface{}) {oldD := old.(*apps.Deployment)curD := cur.(*apps.Deployment)klog.V(4).Infof("Updating deployment %s", oldD.Name)dc.enqueueDeployment(curD) }func (dc *DeploymentController) deleteDeployment(obj interface{}) {d, ok := obj.(*apps.Deployment)if !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))return}d, ok = tombstone.Obj.(*apps.Deployment)if !ok {utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))return}}klog.V(4).Infof("Deleting deployment %s", d.Name)dc.enqueueDeployment(d) }不論是Add Update Delete,處理方法如出一轍,都是一股腦的塞到Client-go 提供的worker Queue里面。 再來看看ReplicaSet
func (dc *DeploymentController) addReplicaSet(obj interface{}) {rs := obj.(*apps.ReplicaSet)if rs.DeletionTimestamp != nil {// On a restart of the controller manager, it's possible for an object to// show up in a state that is already pending deletion.dc.deleteReplicaSet(rs)return}// If it has a ControllerRef, that's all that matters.if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {d := dc.resolveControllerRef(rs.Namespace, controllerRef)if d == nil {return}klog.V(4).Infof("ReplicaSet %s added.", rs.Name)dc.enqueueDeployment(d)return}// Otherwise, it's an orphan. Get a list of all matching Deployments and sync// them to see if anyone wants to adopt it.ds := dc.getDeploymentsForReplicaSet(rs)if len(ds) == 0 {return}klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)for _, d := range ds {dc.enqueueDeployment(d)} } func (dc *DeploymentController) updateReplicaSet(old, cur interface{}) {curRS := cur.(*apps.ReplicaSet)oldRS := old.(*apps.ReplicaSet)if curRS.ResourceVersion == oldRS.ResourceVersion {// Periodic resync will send update events for all known replica sets.// Two different versions of the same replica set will always have different RVs.return}curControllerRef := metav1.GetControllerOf(curRS)oldControllerRef := metav1.GetControllerOf(oldRS)controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)if controllerRefChanged && oldControllerRef != nil {// The ControllerRef was changed. Sync the old controller, if any.if d := dc.resolveControllerRef(oldRS.Namespace, oldControllerRef); d != nil {dc.enqueueDeployment(d)}}// If it has a ControllerRef, that's all that matters.if curControllerRef != nil {d := dc.resolveControllerRef(curRS.Namespace, curControllerRef)if d == nil {return}klog.V(4).Infof("ReplicaSet %s updated.", curRS.Name)dc.enqueueDeployment(d)return}// Otherwise, it's an orphan. If anything changed, sync matching controllers// to see if anyone wants to adopt it now.labelChanged := !reflect.DeepEqual(curRS.Labels, oldRS.Labels)if labelChanged || controllerRefChanged {ds := dc.getDeploymentsForReplicaSet(curRS)if len(ds) == 0 {return}klog.V(4).Infof("Orphan ReplicaSet %s updated.", curRS.Name)for _, d := range ds {dc.enqueueDeployment(d)}} }總結一下Add 和 Update
- 根據ReplicaSet ownerReferences 尋找到對應的Deployment Name
- 判斷是否Rs 發生了變化
- 如果變化就把Deployment 塞到Wokrer Queue里面去
最后看一下針對Pod 的處理
func (dc *DeploymentController) deletePod(obj interface{}) {pod, ok := obj.(*v1.Pod)// When a delete is dropped, the relist will notice a pod in the store not// in the list, leading to the insertion of a tombstone object which contains// the deleted key/value. Note that this value might be stale. If the Pod// changed labels the new deployment will not be woken up till the periodic resync.if !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))return}pod, ok = tombstone.Obj.(*v1.Pod)if !ok {utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v", obj))return}}klog.V(4).Infof("Pod %s deleted.", pod.Name)if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == apps.RecreateDeploymentStrategyType {// Sync if this Deployment now has no more Pods.rsList, err := util.ListReplicaSets(d, util.RsListFromClient(dc.client.AppsV1()))if err != nil {return}podMap, err := dc.getPodMapForDeployment(d, rsList)if err != nil {return}numPods := 0for _, podList := range podMap {numPods += len(podList.Items)}if numPods == 0 {dc.enqueueDeployment(d)}} }可以看到,基本思路差不多,當檢查到Deployment 所有的Pod 都被刪除后,將Deployment name 塞到Worker Queue 里面去。
Deployment Controller Run 函數
資源初始化完畢后,就開始真正的Run 來看一下Run 函數
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer dc.queue.ShutDown()klog.Infof("Starting deployment controller")defer klog.Infof("Shutting down deployment controller")if !controller.WaitForCacheSync("deployment", stopCh, dc.dListerSynced, dc.rsListerSynced, dc.podListerSynced) {return}for i := 0; i < workers; i++ {go wait.Until(dc.worker, time.Second, stopCh)}<-stopCh }func (dc *DeploymentController) worker() {for dc.processNextWorkItem() {} }func (dc *DeploymentController) processNextWorkItem() bool {key, quit := dc.queue.Get()if quit {return false}defer dc.queue.Done(key)err := dc.syncHandler(key.(string))dc.handleErr(err, key)return true }可以看到 這個代碼就是Client-go 里面標準版的Worker 消費者,不斷的從Queue 里面拿Obj 然后調用syncHandler?處理,一起來看看最終的Handler如何處理
dc.syncHandler
func (dc *DeploymentController) syncDeployment(key string) error {startTime := time.Now()klog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime)defer func() {klog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime))}()namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {return err}deployment, err := dc.dLister.Deployments(namespace).Get(name)if errors.IsNotFound(err) {klog.V(2).Infof("Deployment %v has been deleted", key)return nil}if err != nil {return err}// Deep-copy otherwise we are mutating our cache.// TODO: Deep-copy only when needed.d := deployment.DeepCopy()everything := metav1.LabelSelector{}if reflect.DeepEqual(d.Spec.Selector, &everything) {dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.")if d.Status.ObservedGeneration < d.Generation {d.Status.ObservedGeneration = d.Generationdc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d)}return nil}// List ReplicaSets owned by this Deployment, while reconciling ControllerRef// through adoption/orphaning.rsList, err := dc.getReplicaSetsForDeployment(d)if err != nil {return err}// List all Pods owned by this Deployment, grouped by their ReplicaSet.// Current uses of the podMap are://// * check if a Pod is labeled correctly with the pod-template-hash label.// * check that no old Pods are running in the middle of Recreate Deployments.podMap, err := dc.getPodMapForDeployment(d, rsList)if err != nil {return err}if d.DeletionTimestamp != nil {return dc.syncStatusOnly(d, rsList)}// Update deployment conditions with an Unknown condition when pausing/resuming// a deployment. In this way, we can be sure that we won't timeout when a user// resumes a Deployment with a set progressDeadlineSeconds.if err = dc.checkPausedConditions(d); err != nil {return err}if d.Spec.Paused {return dc.sync(d, rsList)}// rollback is not re-entrant in case the underlying replica sets are updated with a new// revision so we should ensure that we won't proceed to update replica sets until we// make sure that the deployment has cleaned up its rollback spec in subsequent enqueues.if getRollbackTo(d) != nil {return dc.rollback(d, rsList)}scalingEvent, err := dc.isScalingEvent(d, rsList)if err != nil {return err}if scalingEvent {return dc.sync(d, rsList)}switch d.Spec.Strategy.Type {case apps.RecreateDeploymentStrategyType:return dc.rolloutRecreate(d, rsList, podMap)case apps.RollingUpdateDeploymentStrategyType:return dc.rolloutRolling(d, rsList)}return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type) }- 根據Worker Queue 取出來的Namespace & Name 從Lister 內Query到真正的Deployment 對象
- 根據Deployment label 查詢對應的ReplicaSet 列表
- 根據ReplicaSet label 查詢對應的 Pod 列表,并生成一個key 為ReplicaSet ID Value 為PodList的Map 數據結構
- 判斷當前Deployment 是否處于暫停狀態
- 判斷當前Deployment 是否處于回滾狀態
- 根據更新策略Recreate?還是?RollingUpdate?決定對應的動作
這里我們以Recreate為例來看一下策略動作
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList) error {// Don't create a new RS if not already existed, so that we avoid scaling up before scaling down.newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)if err != nil {return err}allRSs := append(oldRSs, newRS)activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)// scale down old replica sets.scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)if err != nil {return err}if scaledDown {// Update DeploymentStatus.return dc.syncRolloutStatus(allRSs, newRS, d)}// Do not process a deployment when it has old pods running.if oldPodsRunning(newRS, oldRSs, podMap) {return dc.syncRolloutStatus(allRSs, newRS, d)}// If we need to create a new RS, create it now.if newRS == nil {newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)if err != nil {return err}allRSs = append(oldRSs, newRS)}// scale up new replica set.if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {return err}if util.DeploymentComplete(d, &d.Status) {if err := dc.cleanupDeployment(oldRSs, d); err != nil {return err}}// Sync deployment status.return dc.syncRolloutStatus(allRSs, newRS, d) }- 根據ReplicaSet 獲取當前所有的新老ReplicaSet
- 如果有老的ReplicaSet 那么先把老的ReplicaSet replicas 縮容設置為0,當然第一次創建的時候是沒有老ReplicaSet的
- 如果第一次創建,那么需要去創建對應的ReplicaSet
- 創建完畢對應的ReplicaSet后 擴容ReplicaSet 到對應的值
- 等待新建的創建完畢,清理老的ReplcaiSet
- 更新Deployment Status
下面我們看看第一次創建Deployment 的代碼
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)// Calculate the max revision number among all old RSesmaxOldRevision := deploymentutil.MaxRevision(oldRSs)// Calculate revision number for this new replica setnewRevision := strconv.FormatInt(maxOldRevision+1, 10)// Latest replica set exists. We need to sync its annotations (includes copying all but// annotationsToSkip from the parent deployment, and update revision, desiredReplicas,// and maxReplicas) and also update the revision annotation in the deployment with the// latest revision.if existingNewRS != nil {rsCopy := existingNewRS.DeepCopy()// Set existing new replica set's annotationannotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(d, rsCopy, newRevision, true)minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySecondsif annotationsUpdated || minReadySecondsNeedsUpdate {rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySecondsreturn dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy)}// Should use the revision in existingNewRS's annotation, since it set by beforeneedsUpdate := deploymentutil.SetDeploymentRevision(d, rsCopy.Annotations[deploymentutil.RevisionAnnotation])// If no other Progressing condition has been recorded and we need to estimate the progress// of this deployment then it is likely that old users started caring about progress. In that// case we need to take into account the first time we noticed their new replica set.cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing)if deploymentutil.HasProgressDeadline(d) && cond == nil {msg := fmt.Sprintf("Found new replica set %q", rsCopy.Name)condition := deploymentutil.NewDeploymentCondition(apps.DeploymentProgressing, v1.ConditionTrue, deploymentutil.FoundNewRSReason, msg)deploymentutil.SetDeploymentCondition(&d.Status, *condition)needsUpdate = true}if needsUpdate {var err errorif d, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(d); err != nil {return nil, err}}return rsCopy, nil}if !createIfNotExisted {return nil, nil}// new ReplicaSet does not exist, create one.newRSTemplate := *d.Spec.Template.DeepCopy()podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, d.Status.CollisionCount)newRSTemplate.Labels = labelsutil.CloneAndAddLabel(d.Spec.Template.Labels, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)// Add podTemplateHash label to selector.newRSSelector := labelsutil.CloneSelectorAndAddLabel(d.Spec.Selector, apps.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)// Create new ReplicaSetnewRS := apps.ReplicaSet{ObjectMeta: metav1.ObjectMeta{// Make the name deterministic, to ensure idempotenceName: d.Name + "-" + podTemplateSpecHash,Namespace: d.Namespace,OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(d, controllerKind)},Labels: newRSTemplate.Labels,},Spec: apps.ReplicaSetSpec{Replicas: new(int32),MinReadySeconds: d.Spec.MinReadySeconds,Selector: newRSSelector,Template: newRSTemplate,},}allRSs := append(oldRSs, &newRS)newReplicasCount, err := deploymentutil.NewRSNewReplicas(d, allRSs, &newRS)if err != nil {return nil, err}*(newRS.Spec.Replicas) = newReplicasCount// Set new replica set's annotationdeploymentutil.SetNewReplicaSetAnnotations(d, &newRS, newRevision, false)// Create the new ReplicaSet. If it already exists, then we need to check for possible// hash collisions. If there is any other error, we need to report it in the status of// the Deployment.alreadyExists := falsecreatedRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(&newRS)這里截取了部分重要代碼
- 首先查詢一下當前是否有對應的新的ReplicaSet
- 如果有那么僅僅需要更新Deployment Status 即可
- 如果沒有 那么創建對應的ReplicaSet 結構體
- 最后調用Client-go 創建對應的ReplicaSet 實例
后面還有一些代碼 這里就不貼了,核心思想就是,根據ReplicaSet的情況創建對應的新的ReplicaSet,其實看到使用Client-go 創建ReplicaSet Deployment 這里基本完成了使命,剩下的就是根據watch 改變一下Deployment 的狀態了,至于真正的Pod 的創建,那么就得ReplicaSet Controller 來完成了。
ReplicaSet Controller
ReplicaSet Controller 和Deployment Controller 長得差不多,重復的部分我們就不多說,先看一下初始化的時候,ReplicaSet 主要關注哪些資源
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())}rsc := &ReplicaSetController{GroupVersionKind: gvk,kubeClient: kubeClient,podControl: podControl,burstReplicas: burstReplicas,expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),}rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: rsc.enqueueReplicaSet,UpdateFunc: rsc.updateRS,// This will enter the sync loop and no-op, because the replica set has been deleted from the store.// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended// way of achieving this is by performing a `stop` operation on the replica set.DeleteFunc: rsc.enqueueReplicaSet,})rsc.rsLister = rsInformer.Lister()rsc.rsListerSynced = rsInformer.Informer().HasSyncedpodInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: rsc.addPod,// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from// local storage, so it should be ok.UpdateFunc: rsc.updatePod,DeleteFunc: rsc.deletePod,})rsc.podLister = podInformer.Lister()rsc.podListerSynced = podInformer.Informer().HasSyncedrsc.syncHandler = rsc.syncReplicaSetreturn rsc }可以看到ReplicaSet Controller 主要關注所有的ReplicaSet Pod的創建,他們的處理邏輯是一樣的,都是根據觸發函數,找到對應的ReplicaSet實例后,將對應的ReplicaSet 實例放到Worker Queue里面去。
syncReplicaSet
這里我們直接來看ReplicaSet Controller 的真正處理函數
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {startTime := time.Now()defer func() {klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))}()namespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {return err}rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)if errors.IsNotFound(err) {klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)rsc.expectations.DeleteExpectations(key)return nil}if err != nil {return err}rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)if err != nil {utilruntime.HandleError(fmt.Errorf("Error converting pod selector to selector: %v", err))return nil}// list all pods to include the pods that don't match the rs`s selector// anymore but has the stale controller ref.// TODO: Do the List and Filter in a single pass, or use an index.allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())if err != nil {return err}// Ignore inactive pods.var filteredPods []*v1.Podfor _, pod := range allPods {if controller.IsPodActive(pod) {filteredPods = append(filteredPods, pod)}}// NOTE: filteredPods are pointing to objects from cache - if you need to// modify them, you need to copy it first.filteredPods, err = rsc.claimPods(rs, selector, filteredPods)if err != nil {return err}var manageReplicasErr errorif rsNeedsSync && rs.DeletionTimestamp == nil {manageReplicasErr = rsc.manageReplicas(filteredPods, rs)}rs = rs.DeepCopy()newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)- 根據從Worker Queue 得到的Name 獲取真正的ReplicaSet 實例
- 根據ReplicaSet Label 獲取對應的所有的Pod List
- 將所有的Running Pod 遍歷出來
- 根據Pod 情況判斷是否需要創建 Pod
- 將新的狀態更新到ReplicaSet Status 字段中
manageReplicas
我們主要來看一眼創建Pod 的函數
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {diff := len(filteredPods) - int(*(rs.Spec.Replicas))rsKey, err := controller.KeyFunc(rs)if err != nil {utilruntime.HandleError(fmt.Errorf("Couldn't get key for %v %#v: %v", rsc.Kind, rs, err))return nil}if diff < 0 {diff *= -1if diff > rsc.burstReplicas {diff = rsc.burstReplicas}// TODO: Track UIDs of creates just like deletes. The problem currently// is we'd need to wait on the result of a create to record the pod's// UID, which would require locking *across* the create, which will turn// into a performance bottleneck. We should generate a UID for the pod// beforehand and store it via ExpectCreations.rsc.expectations.ExpectCreations(rsKey, diff)klog.V(2).Infof("Too few replicas for %v %s/%s, need %d, creating %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize// and double with each successful iteration in a kind of "slow start".// This handles attempts to start large numbers of pods that would// likely all fail with the same error. For example a project with a// low quota that attempts to create a large number of pods will be// prevented from spamming the API service with the pod create requests// after one of its pods fails. Conveniently, this also prevents the// event spam that those failures would generate.successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {boolPtr := func(b bool) *bool { return &b }controllerRef := &metav1.OwnerReference{APIVersion: rsc.GroupVersion().String(),Kind: rsc.Kind,Name: rs.Name,UID: rs.UID,BlockOwnerDeletion: boolPtr(true),Controller: boolPtr(true),}err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)if err != nil && errors.IsTimeout(err) {// Pod is created but its initialization has timed out.// If the initialization is successful eventually, the// controller will observe the creation via the informer.// If the initialization fails, or if the pod keeps// uninitialized for a long time, the informer will not// receive any update, and the controller will create a new// pod when the expectation expires.return nil}return err})// Any skipped pods that we never attempted to start shouldn't be expected.// The skipped pods will be retried later. The next controller resync will// retry the slow start process.if skippedPods := diff - successfulCreations; skippedPods > 0 {klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)for i := 0; i < skippedPods; i++ {// Decrement the expected number of creates because the informer won't observe this podrsc.expectations.CreationObserved(rsKey)}}return err} else if diff > 0 {if diff > rsc.burstReplicas {diff = rsc.burstReplicas}klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff)// Choose which Pods to delete, preferring those in earlier phases of startup.podsToDelete := getPodsToDelete(filteredPods, diff)// Snapshot the UIDs (ns/name) of the pods we're expecting to see// deleted, so we know to record their expectations exactly once either// when we see it as an update of the deletion timestamp, or as a delete.// Note that if the labels on a pod/rs change in a way that the pod gets// orphaned, the rs will only wake up after the expectations have// expired even if other pods are deleted.rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))errCh := make(chan error, diff)var wg sync.WaitGroupwg.Add(diff)for _, pod := range podsToDelete {go func(targetPod *v1.Pod) {defer wg.Done()if err := rsc.podControl.DeletePod(rs.Namespace, targetPod.Name, rs); err != nil {// Decrement the expected number of deletes because the informer won't observe this deletionpodKey := controller.PodKey(targetPod)klog.V(2).Infof("Failed to delete %v, decrementing expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)rsc.expectations.DeletionObserved(rsKey, podKey)errCh <- err}}(pod)}wg.Wait()這里的邏輯就非常簡單的,基本上就是根據當前Running Pod 數量和真正的replicas 聲明比對,如果少了那么就調用Client-go 創建Pod ,如果多了就調用CLient-go 去刪除 Pod。
總結
至此,一個Deployment -> ReplicaSet -> Pod 就真正的創建完畢。當Pod 被刪除時候,ReplicaSet Controller 就會把 Pod 拉起來。如果更新Deployment 就會創建新的ReplicaSet 一層層嵌套多個Controller 結合完成最終的 Pod 創建。 當然,這里其實僅僅完成了Pod 數據寫入到ETCD,其實真正的 Pod 實例并沒有創建,還需要scheduler & kubelet 配合完成,我們會在后面的章節繼續介紹。
?
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Kube Controller Manager 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2018阿里云双12年终大促主会场全攻略
- 下一篇: 一文纵览EMAS 到底内含多少阿里核心技