kubernetes hpa源码分析
?
初始化
文件位置:cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {...controllers["horizontalpodautoscaling"] = startHPAController... }HPA Controller和其他的Controller一樣,都在NewControllerInitializers方法中進行注冊,然后通過startHPAController來啟動。
startHPAController|-> startHPAControllerWithRESTClient|-> startHPAControllerWithMetricsClient|-> NewHorizontalController文件位置:/pkg/controller/podautoscaler/horizontal.go
// NewHorizontalController creates a new HorizontalController. func NewHorizontalController(evtNamespacer v1core.EventsGetter,scaleNamespacer scaleclient.ScalesGetter,hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,mapper apimeta.RESTMapper,metricsClient metricsclient.MetricsClient,hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,podInformer coreinformers.PodInformer,resyncPeriod time.Duration,downscaleStabilisationWindow time.Duration,tolerance float64,cpuInitializationPeriod,delayOfInitialReadinessStatus time.Duration,) *HorizontalController {broadcaster := record.NewBroadcaster()broadcaster.StartStructuredLogging(0)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})...hpaInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{AddFunc: hpaController.enqueueHPA,UpdateFunc: hpaController.updateHPA,DeleteFunc: hpaController.deleteHPA,},resyncPeriod,)...return hpaController }核心邏輯是監聽hpa對象的事件,分別對應hpaController.unqueueHPA,hpaController.updateHPA和hpaController.deleteHPA。enqueueHPA本質上就是把hpa對象注冊到HorizontalController的隊列里,updateHPA是更新hpa對象,deleteHPA是刪除對象。hpa對象存在hpaController的workqueue中。
代碼見下文
文件位置:/pkg/controller/podautoscaler/horizontal.go
// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. func (a *HorizontalController) updateHPA(old, cur interface{}) {a.enqueueHPA(cur) }// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. func (a *HorizontalController) enqueueHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// Requests are always added to queue with resyncPeriod delay. If there's already// request for the HPA in the queue then a new request is always dropped. Requests spend resync// interval in queue so HPAs are processed every resync interval.a.queue.AddRateLimited(key) }func (a *HorizontalController) deleteHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// TODO: could we leak if we fail to get the key?a.queue.Forget(key) }startHPAController
文件位置:cmd/kube-controller-manager/app/autoscaling.go
最后會調用到startHPAControllerWithMetricsClient方法,啟動一個線程來調用NewHorizontalController方法初始化一個HPA Controller,然后執行Run方法。 func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {...return startHPAControllerWithLegacyClient(ctx) }func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")metricsClient := metrics.NewHeapsterMetricsClient(hpaClient,metrics.DefaultHeapsterNamespace,metrics.DefaultHeapsterScheme,metrics.DefaultHeapsterService,metrics.DefaultHeapsterPort,)return startHPAControllerWithMetricsClient(ctx, metricsClient) }func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)if err != nil {return nil, false, err}// 初始化go podautoscaler.NewHorizontalController(hpaClient.CoreV1(),scaleClient,hpaClient.AutoscalingV1(),ctx.RESTMapper,metricsClient,ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),ctx.InformerFactory.Core().V1().Pods(),ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,).Run(ctx.Stop)return nil, true, nil }Run
文件位置:pkg/controller/podautoscaler/horizontal.go
func (a *HorizontalController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer a.queue.ShutDown()klog.Infof("Starting HPA controller")defer klog.Infof("Shutting down HPA controller")if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {return}// 啟動異步線程,每秒執行一次go wait.Until(a.worker, time.Second, stopCh)<-stopCh }這里會調用worker執行具體的擴縮容的邏輯。
hpa邏輯路口:定時執行worker
go wait.Until(a.worker, time.Second, stopCh)
核心代碼分析
processNextWorkItem: 遍歷所有hpa對象
func (a *HorizontalController) worker() {for a.processNextWorkItem() {}klog.Infof("horizontal pod autoscaler controller worker shutting down") } func (a *HorizontalController) processNextWorkItem() bool {key, quit := a.queue.Get()if quit {return false}defer a.queue.Done(key)deleted, err := a.reconcileKey(key.(string))if err != nil {utilruntime.HandleError(err)}if !deleted {a.queue.AddRateLimited(key)}return true } hpa對象存儲在HorizontalController的隊列中,遍歷每個hpa對象進行處理。 processNextWorkItem->|-> reconcileKey|-> reconcileAutoscalerworker里面一路執行下來會走到reconcileAutoscaler方法里面,這里是HPA的核心。下面我們專注看看這部分。
reconcileAutoscaler:計算副本數
func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {...//副本數為0,不啟動自動擴縮容if scale.Spec.Replicas == 0 && minReplicas != 0 {// Autoscaling is disabled for this resourcedesiredReplicas = 0rescale = falsesetCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")// 如果當前副本數大于最大期望副本數,那么設置期望副本數為最大副本數} else if currentReplicas > hpa.Spec.MaxReplicas {rescaleReason = "Current number of replicas above Spec.MaxReplicas"desiredReplicas = hpa.Spec.MaxReplicas// 同上} else if currentReplicas < minReplicas {rescaleReason = "Current number of replicas below Spec.MinReplicas"desiredReplicas = minReplicas} else {var metricTimestamp time.Time//計算需要擴縮容的數量metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)if err != nil {...}klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)rescaleMetric := ""if metricDesiredReplicas > desiredReplicas {desiredReplicas = metricDesiredReplicasrescaleMetric = metricName}if desiredReplicas > currentReplicas {rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)}if desiredReplicas < currentReplicas {rescaleReason = "All metrics below target"}//可以在擴縮容的時候指定一個穩定窗口,以防止縮放目標中的副本數量出現波動//doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behaviorif hpa.Spec.Behavior == nil {desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)} else {desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)}rescale = desiredReplicas != currentReplicas}... } 這一段代碼是reconcileAutoscaler里面的核心代碼,在這里會確定一個區間,首先根據當前的scale對象和當前hpa里面配置的對應的參數的值,決策當前的副本數量,其中針對于超過設定的maxReplicas和小于minReplicas兩種情況,只需要簡單的修正為對應的值,直接更新對應的scale對象即可,而scale副本為0的對象,則hpa不會在進行任何操作。對于當前副本數在maxReplicas和minReplicas之間的時候,則需要計算是否需要擴縮容,計算則是調用computeReplicasForMetrics方法來實現。
computeReplicasForMetrics 遍歷度量目標
func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {...//這里的度量目標可以是一個列表,所以遍歷之后取最大的需要擴縮容的數量for i, metricSpec := range metricSpecs {//根據type類型計算需要擴縮容的數量replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])if err != nil {if invalidMetricsCount <= 0 {invalidMetricCondition = conditioninvalidMetricError = err}invalidMetricsCount++}//記錄最大的需要擴縮容的數量if err == nil && (replicas == 0 || replicaCountProposal > replicas) {timestamp = timestampProposalreplicas = replicaCountProposalmetric = metricNameProposal}}...return replicas, metric, statuses, timestamp, nil }在上面的代碼中遍歷所有的metrics,然后選取返回副本數最大的那個。主要計算邏輯都在computeReplicasForMetric中,下面我們看看這個方法。
computeReplicasForMetric:根據type計算副本數
func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//根據不同的類型來進行計量switch spec.Type {//表示如果是一個k8s對象,如Ingress對象case autoscalingv2.ObjectMetricSourceType:...// 表示pod度量類型case autoscalingv2.PodsMetricSourceType:metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)if err != nil {condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}//僅支持AverageValue度量目標,計算需要擴縮容的數量replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)if err != nil {return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}// 表示Resource度量類型case autoscalingv2.ResourceMetricSourceType:...case autoscalingv2.ExternalMetricSourceType:...default:errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))err = fmt.Errorf(errMsg)condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)return 0, "", time.Time{}, condition, err}return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil }這里會根據不同的度量類型來進行統計,目前度量類型有四種,分別是Pods、Object、Resource、External。
computeStatusForPodsMetric&GetMetricReplicas:計算需要擴縮容的數量
文件位置:pkg/controller/podautoscaler/replica_calculator.go
func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//計算需要擴縮容的數量replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)if err != nil {condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, timestampProposal, "", condition, err}...return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil }func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {//獲取pod中度量數據metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)if err != nil {return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)}//通過結合度量數據來計算希望擴縮容的數量是多少replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName(""))return replicaCount, utilization, timestamp, err }這里會調用GetRawMetric方法來獲取pod對應的度量數據,然后再調用calcPlainMetricReplicas方法結合度量數據與目標期望來計算希望擴縮容的數量是多少。
calcPlainMetricReplicas:計算副本數具體實現
func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {podList, err := c.podLister.Pods(namespace).List(selector)...//將pod分成三類進行統計,得到ready的pod數量、ignored Pod集合、missing Pod集合readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)//在度量的數據里移除ignored Pods集合的數據removeMetricsForPods(metrics, ignoredPods)//計算pod中container request 設置的資源之和requests, err := calculatePodRequests(podList, resource)...//獲取資源使用率usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)... }這里會調用groupPods將pod列表的進行一個分類統計。ignoredPods集合里面包含了pod狀態為PodPending的數據;missingPods列表里面包含了在度量數據里面根據pod名找不到的數據。
因為missingPods的度量數據已經在metrics里是找不到的,然后只需要剔除掉ignored Pods集合中度量的資源就好了。接下來調用calculatePodRequests方法統計pod中container request 設置的資源之和。
總結
hpa整個邏輯流程圖:
?
總結
以上是生活随笔為你收集整理的kubernetes hpa源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 海贼王热血航线服务器维护,航海王热血航线
- 下一篇: PLC的软件防干扰措施