日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kubernetes监控之Heapster源码分析

發布時間:2024/4/14 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kubernetes监控之Heapster源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

源碼版本

heapster version: release-1.2

簡介

Heapster是Kubernetes下的一個監控項目,用于進行容器集群的監控和性能分析。
基本的功能及概念介紹可以回顧我之前的一篇文章:《Kubernetes監控之Heapster介紹》。
隨著的Heapster的版本迭代,支持的功能越越來越多,比如新版本支持更多的后端數據存儲方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看過低版本(如v0.18)的源碼,會發現v1.2版本的源碼架構完全變了樣,架構擴展性越來越強,源碼學無止境!
上面很多介紹這篇文章并不會涉及,我們還是會用到最流行的模式:Heapster + InfluxDB。

監控系統架構圖:

該圖很好的描述了監控系統的關鍵組件,及數據流向。
在源碼分析之前我們先介紹Heapster的實現流程,由上圖可以看出Heapster會從各個Node上kubelet獲取相關的監控信息,然后進行匯總發送給后臺數據庫InfluxDB。
這里會涉及到幾個關鍵點:

  • k8s集群會增刪Nodes,Heapster需要獲取這些sources并做相應的操作

  • Heapster后端數據庫怎么存儲?是否支持多后端?

  • Heapster獲取到數據后推送給后端數據庫,那么其提供了API的數據該從何處獲取?本地cache?

  • Heapster從kubelet獲取到的數據是否需要處理?還是能直接存儲到后端

  • 等等..

一起分析完heapster源碼實現,就能進行解惑了。

啟動命令

先列出我解析源碼時所用的命令,及參數使用,便于后面的理解。

# heapster --source=kubernetes:http://<master-ip>:8080?inClusterConfig=false\&useServiceAccount=false --sink=influxdb:http://<influxdb-ip>:8086

啟動流程

從Heapster的啟動流程開始分析其實現,前面做了簡單的分析,可以帶著問題去看源碼會有更好的收獲。

main()

路徑: heapster/metrics/heapster.go

func main() {...// 根據--source參數的輸入來創建數據源// 我們這里會使用kubernetes,下面會根據k8s來解析sourceFactory := sources.NewSourceFactory()// 創建該sourceProvider時,會創建Node的ListWatch,用于監控k8s節點的增刪情況,因為這些才是數據的真實來源.// 該sourceProvider會包含nodeLister,還有kubeletClient,用于跟各個節點的kubelet通信,獲取cadvisor數據sourceProvider, err := sourceFactory.BuildAll(argSources)if err != nil {glog.Fatalf("Failed to create source provide: %v", err)}// 創建sourceManager,其實就是sourceProvider + ScrapeTimeout,用于超時獲取數據sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout)if err != nil {glog.Fatalf("Failed to create source manager: %v", err)}// 根據--sink創建數據存儲后端// 我們這里會使用influxDB,來作為數據的存儲后端sinksFactory := sinks.NewSinkFactory()// 創建sinks時會返回各類對象:// metricSink: 可以理解為本地的metrics數據池,Heapster API獲取到的數據都是從該對象中獲取的,默認一定會創建// sinkList: Heapster在新版本中支持多后端數據存儲,比如你可以指定多個不同的influxDB,也可以同時指定influxDB和Elasticsearch。// historicalSource: 需要配置,我們暫時沒有用到metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource)if metricSink == nil {glog.Fatal("Failed to create metric sink")}if historicalSource == nil && len(*argHistoricalSource) > 0 {glog.Fatal("Failed to use a sink as a historical metrics source")}for _, sink := range sinkList {glog.Infof("Starting with %s", sink.Name())}// 創建sinkManager,會根據之前的sinkList,創建對應數量的協程,用于從sink的數據管道中獲取數據,然后推送到對應的后端sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout)if err != nil {glog.Fatalf("Failed to created sink manager: %v", err)}// 創建對象,用于處理各個kubelet獲取到的metrics數據// 最終都會加入到dataProcessors,在最終的處理函數中會進行遍歷并調用其process()metricsToAggregate := []string{core.MetricCpuUsageRate.Name,core.MetricMemoryUsage.Name,core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}metricsToAggregateForNode := []string{core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// 速率計算對象dataProcessors := []core.DataProcessor{// Convert cumulaties to rateprocessors.NewRateCalculator(core.RateMetricsMapping),}kubernetesUrl, err := getKubernetesAddress(argSources)if err != nil {glog.Fatalf("Failed to get kubernetes address: %v", err)}kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)if err != nil {glog.Fatalf("Failed to get client config: %v", err)}kubeClient := kube_client.NewOrDie(kubeConfig)// 會創建podLister、nodeLister、namespaceLister,用于從k8s watch各個資源的增刪情況// 防止獲取數據失敗podLister, err := getPodLister(kubeClient)if err != nil {glog.Fatalf("Failed to create podLister: %v", err)}nodeLister, err := getNodeLister(kubeClient)if err != nil {glog.Fatalf("Failed to create nodeLister: %v", err)}podBasedEnricher, err := processors.NewPodBasedEnricher(podLister)if err != nil {glog.Fatalf("Failed to create PodBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, podBasedEnricher)namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, namespaceBasedEnricher)// 這里的對象append順序會有一定的要求// 比如Pod的有些數據需要進行containers數據的累加得到dataProcessors = append(dataProcessors,processors.NewPodAggregator(),&processors.NamespaceAggregator{MetricsToAggregate: metricsToAggregate,},&processors.NodeAggregator{MetricsToAggregate: metricsToAggregateForNode,},&processors.ClusterAggregator{MetricsToAggregate: metricsToAggregate,})nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)}dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)// 這是整個Heapster功能的關鍵處// 根據sourceManger、sinkManager、dataProcessors來創建manager對象manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}// 開始創建協程,從各個sources獲取metrics數據,并經過dataProcessors的處理,然后export到各個用于后端數據存儲的sinksmanager.Start()// 以下的就是創建Heapster server,用于提供各類API// 通過http.mux及go-restful進行實現// 新版的heapster還支持TLShandler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)addr := fmt.Sprintf("%s:%d", *argIp, *argPort)glog.Infof("Starting heapster on port %d", *argPort)mux := http.NewServeMux()promHandler := prometheus.Handler()if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {if len(*argTLSClientCAFile) > 0 {authPprofHandler, err := newAuthHandler(handler)if err != nil {glog.Fatalf("Failed to create authorized pprof handler: %v", err)}handler = authPprofHandlerauthPromHandler, err := newAuthHandler(promHandler)if err != nil {glog.Fatalf("Failed to create authorized prometheus handler: %v", err)}promHandler = authPromHandler}mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))// If allowed users is set, then we need to enable Client Authenticationif len(*argAllowedUsers) > 0 {server := &http.Server{Addr: addr,Handler: mux,TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert},}glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile))} else {glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux))}} else {mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))glog.Fatal(http.ListenAndServe(addr, mux))} }

介紹了Heapster的啟動流程后,大致能明白了該啟動過程分為幾個關鍵點:

  • 創建數據源對象

  • 創建后端存儲對象list

  • 創建處理metrics數據的processors

  • 創建manager,并開啟數據的獲取及export的協程

  • 開啟Heapster server,并支持各類API

下面進行一一介紹。

創建數據源

先介紹下相關的結構體,因為這才是作者的核心思想。
創建的sourceProvider是實現了MetricsSourceProvider接口的對象。
先看下MetricsSourceProvider:

type MetricsSourceProvider interface {GetMetricsSources() []MetricsSource }

每個最終返回的對象,都需要提供GetMetricsSources(),看字面意識就可以知道就是提供所有的獲取Metrics源頭的接口。
我們的參數--source=kubernetes,所以其實我們真實返回的結構是kubeletProvider.
路徑: heapster/metrics/sources/kubelet/kubelet.go

type kubeletProvider struct {// 用于從k8s獲取最新的nodes信息,然后根據kubeletClient,合成各個metricSourcesnodeLister *cache.StoreToNodeLister// 反射reflector *cache.Reflector// kubeletClient相關的配置,比如端口:10255kubeletClient *KubeletClient }

結構介紹完了,看下具體的創建過程,跟kubernetes相關的關鍵接口是NewKubeletProvider():

func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) {// 創建kubernetes master及kubelet client相關的配置kubeConfig, kubeletConfig, err := GetKubeConfigs(uri)if err != nil {return nil, err}// 創建kubeClient及kubeletClientkubeClient := kube_client.NewOrDie(kubeConfig)kubeletClient, err := NewKubeletClient(kubeletConfig)if err != nil {return nil, err}// 獲取下所有的Nodes,測試下創建的client是否能正常通訊if _, err := kubeClient.Nodes().List(kube_api.ListOptions{LabelSelector: labels.Everything(),FieldSelector: fields.Everything()}); err != nil {glog.Errorf("Failed to load nodes: %v", err)}// 監控k8s的nodes變更// 這里會創建協程進行watch,便于后面調用nodeLister.List()列出所有的nodes。// 該Watch的實現,需要看下apiServer中的實現,后面會進行講解lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything())nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour)reflector.Run()// 結構在前面介紹過return &kubeletProvider{nodeLister: nodeLister,reflector: reflector,kubeletClient: kubeletClient,}, nil }

該過程會涉及到較多的技術點,比如apiServer中的watch實現,reflector的使用。這里不會進行細講,該文章主要是針對heapster的源碼實現,apiServer相關的實現后面會進行單獨輸出。

這里需要注意的是創建了ListWath,需要關注后面哪里用到了nodeLister.List()進行nodes的獲取。

創建后端服務

前面已經提到后端數據存儲會有兩處,一個是metricSink,另一個是influxdbSink。所以這里會涉及到兩個結構:

type MetricSink struct {// 鎖lock sync.Mutex// 長時間存儲metrics數據,默認時間是15minlongStoreMetrics []stringlongStoreDuration time.Duration// 短時間存儲metrics數據,默認時間是140sshortStoreDuration time.Duration// 短時存儲空間shortStore []*core.DataBatch// 長時存儲空間longStore []*multimetricStore }

該結構就是用于heapster API調用時獲取的數據源,這里會分為兩種數據存儲方式:長時存儲和短時存儲。所以集群越大時,heapster占用內存越多,需要考慮該問題如何處理或者優化。

type influxdbSink struct {// 連接后端influxDB數據庫的clientclient influxdb_common.InfluxdbClient// 鎖sync.RWMutexc influxdb_common.InfluxdbConfigdbExists bool }

這個就是我們配置的InfluxDB的結構,是我們真正的數據存儲后端。

開始介紹創建后端服務流程,從sinksFactory.BuildAll()接口直接入手。
路徑: heapster/metrics/sinks/factory.go

func (this *SinkFactory) BuildAll(uris flags.Uris, historicalUri string) (*metricsink.MetricSink, []core.DataSink, core.HistoricalSource) {result := make([]core.DataSink, 0, len(uris))var metric *metricsink.MetricSinkvar historical core.HistoricalSource// 根據傳入的"--sink"參數信息,進行build// 支持多后端數據存儲,會進行遍歷并創建for _, uri := range uris {// 關鍵接口sink, err := this.Build(uri)if err != nil {glog.Errorf("Failed to create sink: %v", err)continue}if uri.Key == "metric" {metric = sink.(*metricsink.MetricSink)}if uri.String() == historicalUri {if asHistSource, ok := sink.(core.AsHistoricalSource); ok {historical = asHistSource.Historical()} else {glog.Errorf("Sink type %q does not support being used for historical access", uri.Key)}}result = append(result, sink)}// 默認metricSink一定會創建if metric == nil {uri := flags.Uri{}uri.Set("metric")sink, err := this.Build(uri)if err == nil {result = append(result, sink)metric = sink.(*metricsink.MetricSink)} else {glog.Errorf("Error while creating metric sink: %v", err)}}if len(historicalUri) > 0 && historical == nil {glog.Errorf("Error while initializing historical access: unable to use sink %q as a historical source", historicalUri)}return metric, result, historical }

該接口流程比較簡單,就是對傳入參數進行判斷,然后調用this.Build()進行創建,這里只需要注意即使沒有配置metric,也會進行metricSink的創建。

func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {switch uri.Key {。。。case "influxdb":return influxdb.CreateInfluxdbSink(&uri.Val)。。。case "metric":return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{core.MetricCpuUsageRate.MetricDescriptor.Name,core.MetricMemoryUsage.MetricDescriptor.Name}), nil。。。default:return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)} }

influxdb的創建其實就是根據傳入的參數然后創建一個config結構,用于后面創建連接influxDB的client;
metric的創建其實就是初始化了一個MetricSink結構,需要注意的是傳入的第三個參數,因為這是用于指定哪些metrics需要進行長時間存儲,默認就是cpu/usage和memory/usage,因為這兩個參數用戶最為關心。
具體的創建接口就不在深入了,較為簡單。
到這里BuildAll()就結束了,至于返回值前面已經做過介紹,就不在累贅了。
其實沒那么簡單,還有一步:sinkManager的創建。
進入sinks.NewDataSinkManager()接口看下:

func NewDataSinkManager(sinks []core.DataSink, exportDataTimeout, stopTimeout time.Duration) (core.DataSink, error) {sinkHolders := []sinkHolder{}// 遍歷前面創建的sinkListfor _, sink := range sinks {// 為每個sink添加一個dataChannel和stopChannel// 用于獲取數據和stop信號sh := sinkHolder{sink: sink,dataBatchChannel: make(chan *core.DataBatch),stopChannel: make(chan bool),}sinkHolders = append(sinkHolders, sh)// 每個sink都會創建一個協程// 從dataChannel獲取數據,并調用sink.export()導出到后端數據庫go func(sh sinkHolder) {for {select {case data := <-sh.dataBatchChannel:export(sh.sink, data)case isStop := <-sh.stopChannel:glog.V(2).Infof("Stop received: %s", sh.sink.Name())if isStop {sh.sink.Stop()return}}}}(sh)}return &sinkManager{sinkHolders: sinkHolders,exportDataTimeout: exportDataTimeout,stopTimeout: stopTimeout,}, nil }

這里會為每個sink創建協程,等待數據的到來并最終將數據導入到對應的后端數據庫。
這里需要帶個問號,既然channel有一端在收,總得有地方會發送,這會在后面才會揭曉。

go協程 + channel的方式,是golang最常見的方式,確實便用。

創建數據Processors

因為cAdvisor返回的原始數據就包含了nodes和containers的相關數據,所以heapster需要創建各種processor,用于處理成不同類型的數據,比如pod, namespace, cluster,node。
還有些數據需要計算出速率,有些數據需要進行累加,不同類型擁有的metrics還不一樣等等情況。
看下源碼:

func main() {...// 計算namespace和cluster的metrics值時,下列數據需要進行累加求值metricsToAggregate := []string{core.MetricCpuUsageRate.Name,core.MetricMemoryUsage.Name,core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// 計算node的metrics值時,下列數據需要進行累加求值metricsToAggregateForNode := []string{core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// RateMetricsMapping中的數據需要計算速率,比如cpu/usage_rate,network/rx_ratedataProcessors := []core.DataProcessor{// Convert cumulaties to rateprocessors.NewRateCalculator(core.RateMetricsMapping),}kubernetesUrl, err := getKubernetesAddress(argSources)if err != nil {glog.Fatalf("Failed to get kubernetes address: %v", err)}kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)if err != nil {glog.Fatalf("Failed to get client config: %v", err)}kubeClient := kube_client.NewOrDie(kubeConfig)// 創建pod的ListWatch,用于從k8s server監聽pod變更podLister, err := getPodLister(kubeClient)if err != nil {glog.Fatalf("Failed to create podLister: %v", err)}// 創建node的ListWatch,用于從k8s server監聽node變更nodeLister, err := getNodeLister(kubeClient)if err != nil {glog.Fatalf("Failed to create nodeLister: %v", err)}// 該podBasedEnricher用于解析從sources獲取到的pod和container的metrics數據,// 然后對pod和container進行數據完善,比如添加labels.但這里還不會處理metricsValuepodBasedEnricher, err := processors.NewPodBasedEnricher(podLister)if err != nil {glog.Fatalf("Failed to create PodBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, podBasedEnricher)// 跟上面的podBasedEnricher同理,需要注意的是在append時有先后順序namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, namespaceBasedEnricher)// 這里的對象會對metricsValue進行處理,對應的數據進行累加求值dataProcessors = append(dataProcessors,processors.NewPodAggregator(),&processors.NamespaceAggregator{MetricsToAggregate: metricsToAggregate,},&processors.NodeAggregator{MetricsToAggregate: metricsToAggregateForNode,},&processors.ClusterAggregator{MetricsToAggregate: metricsToAggregate,})dataProcessors = append(dataProcessors, processors.NewRcAggregator())nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)}dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)

Processors的功能基本就是這樣了,相對有點復雜,數據處理的樣式和類別較多。
各個對象的Process()方法就不進行一一介紹了,就是按照順序一個一個的填充core.DataBatch數據。有興趣的可以逐個看下,可以借鑒下實現的方式。

獲取源數據并存儲

前面的都是鋪墊,開始介紹heapster的關鍵實現,進行源數據的獲取,并導出到后端存儲。
先介紹相關結構:

type Manager interface {Start()Stop() }

Manager是需要實現Start和stop方法的接口。而真實創建的對象其實是realManager:

type realManager struct {// 數據源source core.MetricsSource// 數據處理對象processors []core.DataProcessor// 后端存儲對象sink core.DataSink// 每次scrape數據的時間間隔resolution time.Duration// 創建多個scrape協程時,需要sleep這點時間,防止異常scrapeOffset time.Duration// scrape 停止的管道stopChan chan struct{}// housekeepSemaphoreChan chan struct{}// 超時housekeepTimeout time.Duration }

關鍵的代碼如下:

manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}manager.Start()

首先會根據前面創建的sourceManager, dataProcessors, sinkManager對象,再創建manager。
路徑: heapster/metrics/manager/manager.go

func NewManager(source core.MetricsSource, processors []core.DataProcessor, sink core.DataSink, resolution time.Duration,scrapeOffset time.Duration, maxParallelism int) (Manager, error) {manager := realManager{source: source,processors: processors,sink: sink,resolution: resolution,scrapeOffset: scrapeOffset,stopChan: make(chan struct{}),housekeepSemaphoreChan: make(chan struct{}, maxParallelism),housekeepTimeout: resolution / 2,}for i := 0; i < maxParallelism; i++ {manager.housekeepSemaphoreChan <- struct{}{}}return &manager, nil }

前面介紹了該關鍵結構readlManager,繼續進入manager.Start():

func (rm *realManager) Start() {go rm.Housekeep() }func (rm *realManager) Housekeep() {for {// Always try to get the newest metricsnow := time.Now()// 獲取數據的時間段,默認是1minstart := now.Truncate(rm.resolution)end := start.Add(rm.resolution)// 真正同步一次的時間間隔,默認是1min + 5stimeToNextSync := end.Add(rm.scrapeOffset).Sub(now)select {case <-time.After(timeToNextSync):rm.housekeep(start, end)case <-rm.stopChan:rm.sink.Stop()return}} }

繼續看rm.housekeep(start, end), 該接口就傳入了時間區間,其實cAdvisor就是支持時間區間來獲取metrics值。

func (rm *realManager) housekeep(start, end time.Time) {if !start.Before(end) {glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end)return}select {case <-rm.housekeepSemaphoreChan:// ok, good to gocase <-time.After(rm.housekeepTimeout):glog.Warningf("Spent too long waiting for housekeeping to start")return}go func(rm *realManager) {defer func() { rm.housekeepSemaphoreChan <- struct{}{} }()// 從sources獲取數據data := rm.source.ScrapeMetrics(start, end)// 遍歷processors,然后進行數據處理for _, p := range rm.processors {newData, err := process(p, data)if err == nil {data = newData} else {glog.Errorf("Error in processor: %v", err)return}}// 最終將數據導出到后端存儲rm.sink.ExportData(data)}(rm) }

邏輯比較簡單,會有三個關鍵:

  • 源數據獲取

  • 數據處理

  • 導出到后端

  • 先看下rm.source.ScrapeMetrics()接口實現.
    路徑: heapster/metrics/sources/manager.go

  • func (this *sourceManager) ScrapeMetrics(start, end time.Time) *DataBatch {// 調用了nodeLister.List()獲取最新的k8s nodes列表,再根據之前配置的kubelet端口等信息,返回sources// 在創建sourceProvider時,會創建node的ListWatch,所以這里nodeLister可使用list()sources := this.metricsSourceProvider.GetMetricsSources()responseChannel := make(chan *DataBatch)。。。// 遍歷各個source,然后創建協程獲取數據for _, source := range sources {go func(source MetricsSource, channel chan *DataBatch, start, end, timeoutTime time.Time, delayInMs int) {// scrape()接口其實就是調用了kubeletMetricsSource.ScrapeMetrics()// 每個node都會組成對應的kubeletMetricsSource// ScrapeMetrics()就是從cAdvisor中獲取監控信息,并進行了decodemetrics := scrape(source, start, end)...select {// 將獲取到的數據丟入responseChannel// 下面會用到case channel <- metrics:// passed the response correctly.returncase <-time.After(timeForResponse):glog.Warningf("Failed to send the response back %s", source)return}}(source, responseChannel, start, end, timeoutTime, delayMs)}response := DataBatch{Timestamp: end,MetricSets: map[string]*MetricSet{},}latencies := make([]int, 11)responseloop:for i := range sources {...select {// 獲取前面創建的協程得到的數據case dataBatch := <-responseChannel:if dataBatch != nil {for key, value := range dataBatch.MetricSets {response.MetricSets[key] = value}}。。。case <-time.After(timeoutTime.Sub(now)):glog.Warningf("Failed to get all responses in time (got %d/%d)", i, len(sources))break responseloop}}...return &response }

    該接口的邏輯就是先通過nodeLister獲取k8s所有的nodes,這樣便能知道所有的kubelet信息,然后創建對應數量的協程從各個kubelet中獲取對應的cAdvisor監控信息,進行處理后再返回。

  • 獲取到數據后,就需要調用各個processors的Process()接口進行數據處理,接口太多就不一一介紹了,挑個node_aggregator.go進行介紹:

  • func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) {for key, metricSet := range batch.MetricSets {// 判斷下該metric是否是pod的// metricSet.Labels都是前面就進行了填充,所以前面說需要注意每個processor的append順序if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod {// Aggregating podsnodeName, found := metricSet.Labels[core.LabelNodename.Key]if nodeName == "" {glog.V(8).Infof("Skipping pod %s: no node info", key)continue}if found {// 獲取nodeKey,比如: node:172.25.5.111nodeKey := core.NodeKey(nodeName)// 前面都是判斷該pod在哪個node上,然后該node的數據是需要通過這些pod進行累加得到node, found := batch.MetricSets[nodeKey]if !found {glog.V(1).Info("No metric for node %s, cannot perform node level aggregation.")} else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil {return nil, err}} else {glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels)}}}return batch, nil }

    基本流程就是這樣了,有需要的可以各個深入查看。

  • 最后就是數據的后端存儲。
    這里會涉及到兩部分:metricSink和influxdbSink。

  • 從rm.sink.ExportData(data)接口入手:
    路徑: heapster/metrics/sinks/manager.go

    func (this *sinkManager) ExportData(data *core.DataBatch) {var wg sync.WaitGroup// 遍歷所有的sink,這里其實就兩個for _, sh := range this.sinkHolders {wg.Add(1)// 創建協程,然后將之前獲取的data丟入dataBatchChannelgo func(sh sinkHolder, wg *sync.WaitGroup) {defer wg.Done()glog.V(2).Infof("Pushing data to: %s", sh.sink.Name())select {case sh.dataBatchChannel <- data:glog.V(2).Infof("Data push completed: %s", sh.sink.Name())// everything okcase <-time.After(this.exportDataTimeout):glog.Warningf("Failed to push data to sink: %s", sh.sink.Name())}}(sh, &wg)}// Wait for all pushes to complete or timeout.wg.Wait() }

    千辛萬苦,你把數據丟入sh.dataBatchChannel完事了?
    dataBatchChannel有點眼熟,因為之前創建sinkManager的時候,也創建了協程并監聽了該管道,所以真正export數據是在之前就完成了,這里只需要把數據丟入管道即可。
    所以golang中協程與協程之間的通信,channel才是王道啊!
    ExportData有兩個,一個一個講吧。
    先來關鍵的influxDB.
    路徑: heapster/metrics/sinks/influxdb/influxdb.go

    func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {...dataPoints := make([]influxdb.Point, 0, 0)for _, metricSet := range dataBatch.MetricSets {// 遍歷MetricValuesfor metricName, metricValue := range metricSet.MetricValues {var value interface{}if core.ValueInt64 == metricValue.ValueType {value = metricValue.IntValue} else if core.ValueFloat == metricValue.ValueType {value = float64(metricValue.FloatValue)} else {continue}// Prepare measurement without fieldsfieldName := "value"measurementName := metricNameif sink.c.WithFields {// Prepare measurement and field namesserieName := strings.SplitN(metricName, "/", 2)measurementName = serieName[0]if len(serieName) > 1 {fieldName = serieName[1]}}// influxdb單條數據結構point := influxdb.Point{// 度量值名稱,比如cpu/usageMeasurement: measurementName,// 該tags就是在processors中進行添加,主要是pod_name,node_name,namespace_name等Tags: metricSet.Labels,// 該字段就是具體的值了Fields: map[string]interface{}{fieldName: value,},// 時間戳Time: dataBatch.Timestamp.UTC(),}// append到dataPoints,超過maxSendBatchSize數量后直接sendData到influxdbdataPoints = append(dataPoints, point)if len(dataPoints) >= maxSendBatchSize {sink.sendData(dataPoints)dataPoints = make([]influxdb.Point, 0, 0)}}// 遍歷LabeledMetrics,主要就是filesystem的數據// 不太明白為何要將filesystem的數據進行區分,要放到Labeled中?什么意圖?望高手指點,謝謝// 接下來的操作就跟上面MetricValues的操作差不多了for _, labeledMetric := range metricSet.LabeledMetrics {。。。point := influxdb.Point{Measurement: measurementName,Tags: make(map[string]string),Fields: map[string]interface{}{fieldName: value,},Time: dataBatch.Timestamp.UTC(),}for key, value := range metricSet.Labels {point.Tags[key] = value}for key, value := range labeledMetric.Labels {point.Tags[key] = value}dataPoints = append(dataPoints, point)if len(dataPoints) >= maxSendBatchSize {sink.sendData(dataPoints)dataPoints = make([]influxdb.Point, 0, 0)}}}if len(dataPoints) >= 0 {sink.sendData(dataPoints)} }

    該接口中有一處不太明白,metricSet中的LabeledMetrics和MetricsValue有何差別,為何要將filesystem的數據進行區分對待,放入LabeldMetrics?
    看代碼的過程中沒有得到答案,望大神指點迷津,多謝多謝!

    有問題,但也不影響繼續往下學習,接著看下MetricSink:

    func (this *MetricSink) ExportData(batch *core.DataBatch) {this.lock.Lock()defer this.lock.Unlock()now := time.Now()// 將數據丟入longStore和shortStore// 需要根據保存的時間將老數據丟棄this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)),buildMultimetricStore(this.longStoreMetrics, batch))this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch) }

    該邏輯比較簡單,就是將數據丟入兩個Store中,然后把過期數據丟棄。
    這里提醒一點,heapster API調用時先會從longStore中匹配數據,沒匹配上的話再從shortStore獲取,而longStore中存儲的數據類型前面已經做過介紹。

    終于結束了。。

    Heapster API創建

    前面的主流業務都介紹完了,Heapster本身也提供了API用于開發者進行使用與測試。
    繼續分析代碼吧:

    // 關鍵接口,后面分析handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)。。。// 創建http的mux多分器,用于http.Server的路由mux := http.NewServeMux()// prometheus:最新出現的人氣很高的監控系統,值得了解學習下,后續安排!promHandler := prometheus.Handler()// 支持TLS,我們用了httpif len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {。。。} else {// 多分器分了"/"和"/metrics"// 進入"/",還會進行細分,里面使用到了go-restfulmux.Handle("/", handler)mux.Handle("/metrics", promHandler)// 注冊健康檢測接口healthz.InstallHandler(mux, healthzChecker(metricSink))// 啟動Serverglog.Fatal(http.ListenAndServe(addr, mux))}

    這里的關鍵是setupHandlers()接口,需要學習下里面如何使用go-restful進行請求路由的。

    k8s apiServer中也大量使用了go-restful,在學習該源碼時有進行過分析

    路徑: heapster/metrics/handlers.go

    func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler {runningInKubernetes := true// 創建container,指定route類型為CurlyRouter// 這些都跟go-restful基礎有關,有興趣的可以看下原理wsContainer := restful.NewContainer()wsContainer.EnableContentEncoding(true)wsContainer.Router(restful.CurlyRouter{})// 注冊v1版本相關的api,包括官方介紹的"/api/v1/model"a := v1.NewApi(runningInKubernetes, metricSink, historicalSource)a.Register(wsContainer)// 這個metricsApi注冊了"/apis/metrics/v1alpha1"的各類命令// 暫不關心m := metricsApi.NewApi(metricSink, podLister, nodeLister)m.Register(wsContainer)handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)switch name {case "profile":pprof.Profile(resp, req.Request)case "symbol":pprof.Symbol(resp, req.Request)case "cmdline":pprof.Cmdline(resp, req.Request)default:pprof.Index(resp, req.Request)}}// Setup pporf handlers.ws = new(restful.WebService).Path(pprofBasePath)ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint")wsContainer.Add(ws)return wsContainer }

    關鍵在于v1版本的API注冊,繼續深入a.Register(wsContainer):

    func (a *Api) Register(container *restful.Container) {// 注冊"/api/v1/metric-export" API// 用于從shortStore中獲取所有的metrics信息ws := new(restful.WebService)ws.Path("/api/v1/metric-export").Doc("Exports the latest point for all Heapster metrics").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetrics).Doc("export the latest data point for all metrics").Operation("exportMetrics").Writes([]*types.Timeseries{}))// ws必須要add到container中才能生效container.Add(ws)// 注冊"/api/v1/metric-export-schema" API// 用于導出所有的metrics name,比如network-rx// 還會導出還有的labels,比如pod-namews = new(restful.WebService)ws.Path("/api/v1/metric-export-schema").Doc("Schema for metrics exported by heapster").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetricsSchema).Doc("export the schema for all metrics").Operation("exportmetricsSchema").Writes(types.TimeseriesSchema{}))container.Add(ws)// 注冊metircSink相關的API,即"/api/v1/model/"if a.metricSink != nil {glog.Infof("Starting to Register Model.")a.RegisterModel(container)}if a.historicalSource != nil {a.RegisterHistorical(container)} }

    官方資料中介紹heapster metric model,我們使用到這些API也會比較多。
    進入a.RegisterModel(container)看下:

    func (a *Api) RegisterModel(container *restful.Container) {ws := new(restful.WebService)// 指定所有命令的prefix: "/api/v1/model"ws.Path("/api/v1/model").Doc("Root endpoint of the stats model").Consumes("*/*").Produces(restful.MIME_JSON)// 在這里增加各類命令,比如"/metrics/,/nodes/"等等addClusterMetricsRoutes(a, ws)// 列出所有的keysws.Route(ws.GET("/debug/allkeys").To(metrics.InstrumentRouteFunc("debugAllKeys", a.allKeys)).Doc("Get keys of all metric sets available").Operation("debugAllKeys"))container.Add(ws) }

    繼續看addClusterMetricsRoutes():

    func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) {。。。if a.isRunningInKubernetes() {// 列出所有namespaces的APIws.Route(ws.GET("/namespaces/").To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)).Doc("Get a list of all namespaces that have some current metrics").Operation("namespaceList"))// 獲取指定namespaces的metricsws.Route(ws.GET("/namespaces/{namespace-name}/metrics").To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)).Doc("Get a list of all available metrics for a Namespace entity").Operation("availableNamespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")))// 獲取namespace指定的metrics值ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}").To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)).Doc("Export an aggregated namespace-level metric").Operation("namespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")).Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")).Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")).Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")).Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")).Writes(types.MetricResult{}))。。。}。。。 }

    Heapster API的注冊基本就這樣了,在花點時間看下API的實現吧。
    我們挑一個例子做下分析,獲取某個pod的指定的metrics值.
    對應的接口:heapster/metrics/api/v1/model_handler.go

    func (a *Api) podMetrics(request *restful.Request, response *restful.Response) {a.processMetricRequest(// 根據URI傳入的ns和pod名字,拼裝成key,如:"namespace:default/pod:123"core.PodKey(request.PathParameter("namespace-name"),request.PathParameter("pod-name")),request, response) }

    根據URI的輸入參數并調用processMetricRequest()接口,獲取對應的metric value:

    func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) {// 時間區間start, end, err := getStartEndTime(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}// 獲取metric Name,比如"/cpu/usage"metricName := request.PathParameter("metric-name")// 根據metricName進行轉換,比如將cpu-usage轉換成cpu/usage_rate// 所以這里需要注意cpu-usage不等于/cpu/usage,一個表示cpu使用率,一個表示cpu使用量convertedMetricName := convertMetricName(metricName)// 獲取請求中的labels,根據是否有指定labels來調用不同的接口labels, err := getLabels(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}var metrics map[string][]core.TimestampedMetricValueif labels != nil {// 該接口從metricSet.LabeledMetrics中獲取對應的valuemetrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end)} else {// 該接口先從longStoreMetrics中進行匹配,匹配不到的話再從shortStore中獲取對應的metricValuemetrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end)}// 將獲取到的metricValue轉換成MetricPoint格式的值,會有多組"時間戳+value"converted := exportTimestampedMetricValue(metrics[key])// 將結果進行responseresponse.WriteEntity(converted) }

    OK,大功告成!API的實現也講完了,很多API都是相通的,最終都會調用相同的接口,所以不一一介紹了。
    這里需要注意heapster的API的URI還有多種寫法,比如/api/v1/model/cpu-usage,等價于/api/v1/model/cpu/usage_rate/,別誤理解成/cpu/usage了,這兩個概念不一樣,一個是cpu使用率,一個是cpu使用量。

    上面的提醒告訴我們,沒事多看源碼,很多誤解自然而然就解除了!

    筆者能力有限,看源碼也在于學習提升能力,當然也會有較多不理解或者理解不當的地方,希望各位能予以矯正,多謝多謝!

    擴展

    上面的介紹完了Heapster的實現,我們可以思考下是否可以動手修改源碼,比如增加一些對象的metrics信息。
    筆者考慮是否可以直接支持RC/RS/Deployment的metrics信息,讓業務層可以直接拿到服務的整體信息。

    參考資料

  • Heapster官方資料:https://github.com/kubernetes...

  • InfluxDB github: https://github.com/influxdata...

  • 超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Kubernetes监控之Heapster源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。