日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kubernetes hpa源码分析

發布時間:2024/3/12 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kubernetes hpa源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

初始化

文件位置:cmd/kube-controller-manager/app/controllermanager.go

func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {...controllers["horizontalpodautoscaling"] = startHPAController... }

HPA Controller和其他的Controller一樣,都在NewControllerInitializers方法中進行注冊,然后通過startHPAController來啟動。

startHPAController|-> startHPAControllerWithRESTClient|-> startHPAControllerWithMetricsClient|-> NewHorizontalController

文件位置:/pkg/controller/podautoscaler/horizontal.go

// NewHorizontalController creates a new HorizontalController. func NewHorizontalController(evtNamespacer v1core.EventsGetter,scaleNamespacer scaleclient.ScalesGetter,hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,mapper apimeta.RESTMapper,metricsClient metricsclient.MetricsClient,hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,podInformer coreinformers.PodInformer,resyncPeriod time.Duration,downscaleStabilisationWindow time.Duration,tolerance float64,cpuInitializationPeriod,delayOfInitialReadinessStatus time.Duration,) *HorizontalController {broadcaster := record.NewBroadcaster()broadcaster.StartStructuredLogging(0)broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})...hpaInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{AddFunc: hpaController.enqueueHPA,UpdateFunc: hpaController.updateHPA,DeleteFunc: hpaController.deleteHPA,},resyncPeriod,)...return hpaController }

核心邏輯是監聽hpa對象的事件,分別對應hpaController.unqueueHPA,hpaController.updateHPA和hpaController.deleteHPA。enqueueHPA本質上就是把hpa對象注冊到HorizontalController的隊列里,updateHPA是更新hpa對象,deleteHPA是刪除對象。hpa對象存在hpaController的workqueue中。

代碼見下文

文件位置:/pkg/controller/podautoscaler/horizontal.go

// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. func (a *HorizontalController) updateHPA(old, cur interface{}) {a.enqueueHPA(cur) }// obj could be an *v1.HorizontalPodAutoscaler, or a DeletionFinalStateUnknown marker item. func (a *HorizontalController) enqueueHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// Requests are always added to queue with resyncPeriod delay. If there's already// request for the HPA in the queue then a new request is always dropped. Requests spend resync// interval in queue so HPAs are processed every resync interval.a.queue.AddRateLimited(key) }func (a *HorizontalController) deleteHPA(obj interface{}) {key, err := controller.KeyFunc(obj)if err != nil {utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))return}// TODO: could we leak if we fail to get the key?a.queue.Forget(key) }

startHPAController

文件位置:cmd/kube-controller-manager/app/autoscaling.go

最后會調用到startHPAControllerWithMetricsClient方法,啟動一個線程來調用NewHorizontalController方法初始化一個HPA Controller,然后執行Run方法。 func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {...return startHPAControllerWithLegacyClient(ctx) }func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")metricsClient := metrics.NewHeapsterMetricsClient(hpaClient,metrics.DefaultHeapsterNamespace,metrics.DefaultHeapsterScheme,metrics.DefaultHeapsterService,metrics.DefaultHeapsterPort,)return startHPAControllerWithMetricsClient(ctx, metricsClient) }func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)if err != nil {return nil, false, err}// 初始化go podautoscaler.NewHorizontalController(hpaClient.CoreV1(),scaleClient,hpaClient.AutoscalingV1(),ctx.RESTMapper,metricsClient,ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),ctx.InformerFactory.Core().V1().Pods(),ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,).Run(ctx.Stop)return nil, true, nil }

Run

文件位置:pkg/controller/podautoscaler/horizontal.go

func (a *HorizontalController) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()defer a.queue.ShutDown()klog.Infof("Starting HPA controller")defer klog.Infof("Shutting down HPA controller")if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {return}// 啟動異步線程,每秒執行一次go wait.Until(a.worker, time.Second, stopCh)<-stopCh }

這里會調用worker執行具體的擴縮容的邏輯。

hpa邏輯路口:定時執行worker

go wait.Until(a.worker, time.Second, stopCh)

核心代碼分析

processNextWorkItem: 遍歷所有hpa對象

func (a *HorizontalController) worker() {for a.processNextWorkItem() {}klog.Infof("horizontal pod autoscaler controller worker shutting down") } func (a *HorizontalController) processNextWorkItem() bool {key, quit := a.queue.Get()if quit {return false}defer a.queue.Done(key)deleted, err := a.reconcileKey(key.(string))if err != nil {utilruntime.HandleError(err)}if !deleted {a.queue.AddRateLimited(key)}return true } hpa對象存儲在HorizontalController的隊列中,遍歷每個hpa對象進行處理。 processNextWorkItem->|-> reconcileKey|-> reconcileAutoscaler

worker里面一路執行下來會走到reconcileAutoscaler方法里面,這里是HPA的核心。下面我們專注看看這部分。

reconcileAutoscaler:計算副本數

func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {...//副本數為0,不啟動自動擴縮容if scale.Spec.Replicas == 0 && minReplicas != 0 {// Autoscaling is disabled for this resourcedesiredReplicas = 0rescale = falsesetCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")// 如果當前副本數大于最大期望副本數,那么設置期望副本數為最大副本數} else if currentReplicas > hpa.Spec.MaxReplicas {rescaleReason = "Current number of replicas above Spec.MaxReplicas"desiredReplicas = hpa.Spec.MaxReplicas// 同上} else if currentReplicas < minReplicas {rescaleReason = "Current number of replicas below Spec.MinReplicas"desiredReplicas = minReplicas} else {var metricTimestamp time.Time//計算需要擴縮容的數量metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)if err != nil {...}klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)rescaleMetric := ""if metricDesiredReplicas > desiredReplicas {desiredReplicas = metricDesiredReplicasrescaleMetric = metricName}if desiredReplicas > currentReplicas {rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)}if desiredReplicas < currentReplicas {rescaleReason = "All metrics below target"}//可以在擴縮容的時候指定一個穩定窗口,以防止縮放目標中的副本數量出現波動//doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behaviorif hpa.Spec.Behavior == nil {desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)} else {desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)}rescale = desiredReplicas != currentReplicas}... } 這一段代碼是reconcileAutoscaler里面的核心代碼,在這里會確定一個區間,首先根據當前的scale對象和當前hpa里面配置的對應的參數的值,決策當前的副本數量,其中針對于超過設定的maxReplicas和小于minReplicas兩種情況,只需要簡單的修正為對應的值,直接更新對應的scale對象即可,而scale副本為0的對象,則hpa不會在進行任何操作。

對于當前副本數在maxReplicas和minReplicas之間的時候,則需要計算是否需要擴縮容,計算則是調用computeReplicasForMetrics方法來實現。

computeReplicasForMetrics 遍歷度量目標

func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {...//這里的度量目標可以是一個列表,所以遍歷之后取最大的需要擴縮容的數量for i, metricSpec := range metricSpecs {//根據type類型計算需要擴縮容的數量replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])if err != nil {if invalidMetricsCount <= 0 {invalidMetricCondition = conditioninvalidMetricError = err}invalidMetricsCount++}//記錄最大的需要擴縮容的數量if err == nil && (replicas == 0 || replicaCountProposal > replicas) {timestamp = timestampProposalreplicas = replicaCountProposalmetric = metricNameProposal}}...return replicas, metric, statuses, timestamp, nil }

在上面的代碼中遍歷所有的metrics,然后選取返回副本數最大的那個。主要計算邏輯都在computeReplicasForMetric中,下面我們看看這個方法。

computeReplicasForMetric:根據type計算副本數

func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//根據不同的類型來進行計量switch spec.Type {//表示如果是一個k8s對象,如Ingress對象case autoscalingv2.ObjectMetricSourceType:...// 表示pod度量類型case autoscalingv2.PodsMetricSourceType:metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)if err != nil {condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}//僅支持AverageValue度量目標,計算需要擴縮容的數量replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)if err != nil {return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)}// 表示Resource度量類型case autoscalingv2.ResourceMetricSourceType:...case autoscalingv2.ExternalMetricSourceType:...default:errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))err = fmt.Errorf(errMsg)condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)return 0, "", time.Time{}, condition, err}return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil }

這里會根據不同的度量類型來進行統計,目前度量類型有四種,分別是Pods、Object、Resource、External。

computeStatusForPodsMetric&GetMetricReplicas:計算需要擴縮容的數量

文件位置:pkg/controller/podautoscaler/replica_calculator.go

func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {//計算需要擴縮容的數量replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector)if err != nil {condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)return 0, timestampProposal, "", condition, err}...return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil }func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {//獲取pod中度量數據metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector)if err != nil {return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err)}//通過結合度量數據來計算希望擴縮容的數量是多少replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName(""))return replicaCount, utilization, timestamp, err }

這里會調用GetRawMetric方法來獲取pod對應的度量數據,然后再調用calcPlainMetricReplicas方法結合度量數據與目標期望來計算希望擴縮容的數量是多少。

calcPlainMetricReplicas:計算副本數具體實現

func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) {podList, err := c.podLister.Pods(namespace).List(selector)...//將pod分成三類進行統計,得到ready的pod數量、ignored Pod集合、missing Pod集合readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus)//在度量的數據里移除ignored Pods集合的數據removeMetricsForPods(metrics, ignoredPods)//計算pod中container request 設置的資源之和requests, err := calculatePodRequests(podList, resource)...//獲取資源使用率usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization)... }

這里會調用groupPods將pod列表的進行一個分類統計。ignoredPods集合里面包含了pod狀態為PodPending的數據;missingPods列表里面包含了在度量數據里面根據pod名找不到的數據。

因為missingPods的度量數據已經在metrics里是找不到的,然后只需要剔除掉ignored Pods集合中度量的資源就好了。接下來調用calculatePodRequests方法統計pod中container request 設置的資源之和。

總結

hpa整個邏輯流程圖:


?

總結

以上是生活随笔為你收集整理的kubernetes hpa源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。