日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kubernetes监控之Heapster源码分析

發布時間:2024/4/14 编程问答 75 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kubernetes监控之Heapster源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

源碼版本

heapster version: release-1.2

簡介

Heapster是Kubernetes下的一個監控項目,用于進行容器集群的監控和性能分析。
基本的功能及概念介紹可以回顧我之前的一篇文章:《Kubernetes監控之Heapster介紹》。
隨著的Heapster的版本迭代,支持的功能越越來越多,比如新版本支持更多的后端數據存儲方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看過低版本(如v0.18)的源碼,會發現v1.2版本的源碼架構完全變了樣,架構擴展性越來越強,源碼學無止境!
上面很多介紹這篇文章并不會涉及,我們還是會用到最流行的模式:Heapster + InfluxDB。

監控系統架構圖:

該圖很好的描述了監控系統的關鍵組件,及數據流向。
在源碼分析之前我們先介紹Heapster的實現流程,由上圖可以看出Heapster會從各個Node上kubelet獲取相關的監控信息,然后進行匯總發送給后臺數據庫InfluxDB。
這里會涉及到幾個關鍵點:

  • k8s集群會增刪Nodes,Heapster需要獲取這些sources并做相應的操作

  • Heapster后端數據庫怎么存儲?是否支持多后端?

  • Heapster獲取到數據后推送給后端數據庫,那么其提供了API的數據該從何處獲取?本地cache?

  • Heapster從kubelet獲取到的數據是否需要處理?還是能直接存儲到后端

  • 等等..

一起分析完heapster源碼實現,就能進行解惑了。

啟動命令

先列出我解析源碼時所用的命令,及參數使用,便于后面的理解。

# heapster --source=kubernetes:http://<master-ip>:8080?inClusterConfig=false\&useServiceAccount=false --sink=influxdb:http://<influxdb-ip>:8086

啟動流程

從Heapster的啟動流程開始分析其實現,前面做了簡單的分析,可以帶著問題去看源碼會有更好的收獲。

main()

路徑: heapster/metrics/heapster.go

func main() {...// 根據--source參數的輸入來創建數據源// 我們這里會使用kubernetes,下面會根據k8s來解析sourceFactory := sources.NewSourceFactory()// 創建該sourceProvider時,會創建Node的ListWatch,用于監控k8s節點的增刪情況,因為這些才是數據的真實來源.// 該sourceProvider會包含nodeLister,還有kubeletClient,用于跟各個節點的kubelet通信,獲取cadvisor數據sourceProvider, err := sourceFactory.BuildAll(argSources)if err != nil {glog.Fatalf("Failed to create source provide: %v", err)}// 創建sourceManager,其實就是sourceProvider + ScrapeTimeout,用于超時獲取數據sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout)if err != nil {glog.Fatalf("Failed to create source manager: %v", err)}// 根據--sink創建數據存儲后端// 我們這里會使用influxDB,來作為數據的存儲后端sinksFactory := sinks.NewSinkFactory()// 創建sinks時會返回各類對象:// metricSink: 可以理解為本地的metrics數據池,Heapster API獲取到的數據都是從該對象中獲取的,默認一定會創建// sinkList: Heapster在新版本中支持多后端數據存儲,比如你可以指定多個不同的influxDB,也可以同時指定influxDB和Elasticsearch。// historicalSource: 需要配置,我們暫時沒有用到metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource)if metricSink == nil {glog.Fatal("Failed to create metric sink")}if historicalSource == nil && len(*argHistoricalSource) > 0 {glog.Fatal("Failed to use a sink as a historical metrics source")}for _, sink := range sinkList {glog.Infof("Starting with %s", sink.Name())}// 創建sinkManager,會根據之前的sinkList,創建對應數量的協程,用于從sink的數據管道中獲取數據,然后推送到對應的后端sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout)if err != nil {glog.Fatalf("Failed to created sink manager: %v", err)}// 創建對象,用于處理各個kubelet獲取到的metrics數據// 最終都會加入到dataProcessors,在最終的處理函數中會進行遍歷并調用其process()metricsToAggregate := []string{core.MetricCpuUsageRate.Name,core.MetricMemoryUsage.Name,core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}metricsToAggregateForNode := []string{core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// 速率計算對象dataProcessors := []core.DataProcessor{// Convert cumulaties to rateprocessors.NewRateCalculator(core.RateMetricsMapping),}kubernetesUrl, err := getKubernetesAddress(argSources)if err != nil {glog.Fatalf("Failed to get kubernetes address: %v", err)}kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)if err != nil {glog.Fatalf("Failed to get client config: %v", err)}kubeClient := kube_client.NewOrDie(kubeConfig)// 會創建podLister、nodeLister、namespaceLister,用于從k8s watch各個資源的增刪情況// 防止獲取數據失敗podLister, err := getPodLister(kubeClient)if err != nil {glog.Fatalf("Failed to create podLister: %v", err)}nodeLister, err := getNodeLister(kubeClient)if err != nil {glog.Fatalf("Failed to create nodeLister: %v", err)}podBasedEnricher, err := processors.NewPodBasedEnricher(podLister)if err != nil {glog.Fatalf("Failed to create PodBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, podBasedEnricher)namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, namespaceBasedEnricher)// 這里的對象append順序會有一定的要求// 比如Pod的有些數據需要進行containers數據的累加得到dataProcessors = append(dataProcessors,processors.NewPodAggregator(),&processors.NamespaceAggregator{MetricsToAggregate: metricsToAggregate,},&processors.NodeAggregator{MetricsToAggregate: metricsToAggregateForNode,},&processors.ClusterAggregator{MetricsToAggregate: metricsToAggregate,})nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)}dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)// 這是整個Heapster功能的關鍵處// 根據sourceManger、sinkManager、dataProcessors來創建manager對象manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}// 開始創建協程,從各個sources獲取metrics數據,并經過dataProcessors的處理,然后export到各個用于后端數據存儲的sinksmanager.Start()// 以下的就是創建Heapster server,用于提供各類API// 通過http.mux及go-restful進行實現// 新版的heapster還支持TLShandler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)addr := fmt.Sprintf("%s:%d", *argIp, *argPort)glog.Infof("Starting heapster on port %d", *argPort)mux := http.NewServeMux()promHandler := prometheus.Handler()if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {if len(*argTLSClientCAFile) > 0 {authPprofHandler, err := newAuthHandler(handler)if err != nil {glog.Fatalf("Failed to create authorized pprof handler: %v", err)}handler = authPprofHandlerauthPromHandler, err := newAuthHandler(promHandler)if err != nil {glog.Fatalf("Failed to create authorized prometheus handler: %v", err)}promHandler = authPromHandler}mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))// If allowed users is set, then we need to enable Client Authenticationif len(*argAllowedUsers) > 0 {server := &http.Server{Addr: addr,Handler: mux,TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert},}glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile))} else {glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux))}} else {mux.Handle("/", handler)mux.Handle("/metrics", promHandler)healthz.InstallHandler(mux, healthzChecker(metricSink))glog.Fatal(http.ListenAndServe(addr, mux))} }

介紹了Heapster的啟動流程后,大致能明白了該啟動過程分為幾個關鍵點:

  • 創建數據源對象

  • 創建后端存儲對象list

  • 創建處理metrics數據的processors

  • 創建manager,并開啟數據的獲取及export的協程

  • 開啟Heapster server,并支持各類API

下面進行一一介紹。

創建數據源

先介紹下相關的結構體,因為這才是作者的核心思想。
創建的sourceProvider是實現了MetricsSourceProvider接口的對象。
先看下MetricsSourceProvider:

type MetricsSourceProvider interface {GetMetricsSources() []MetricsSource }

每個最終返回的對象,都需要提供GetMetricsSources(),看字面意識就可以知道就是提供所有的獲取Metrics源頭的接口。
我們的參數--source=kubernetes,所以其實我們真實返回的結構是kubeletProvider.
路徑: heapster/metrics/sources/kubelet/kubelet.go

type kubeletProvider struct {// 用于從k8s獲取最新的nodes信息,然后根據kubeletClient,合成各個metricSourcesnodeLister *cache.StoreToNodeLister// 反射reflector *cache.Reflector// kubeletClient相關的配置,比如端口:10255kubeletClient *KubeletClient }

結構介紹完了,看下具體的創建過程,跟kubernetes相關的關鍵接口是NewKubeletProvider():

func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) {// 創建kubernetes master及kubelet client相關的配置kubeConfig, kubeletConfig, err := GetKubeConfigs(uri)if err != nil {return nil, err}// 創建kubeClient及kubeletClientkubeClient := kube_client.NewOrDie(kubeConfig)kubeletClient, err := NewKubeletClient(kubeletConfig)if err != nil {return nil, err}// 獲取下所有的Nodes,測試下創建的client是否能正常通訊if _, err := kubeClient.Nodes().List(kube_api.ListOptions{LabelSelector: labels.Everything(),FieldSelector: fields.Everything()}); err != nil {glog.Errorf("Failed to load nodes: %v", err)}// 監控k8s的nodes變更// 這里會創建協程進行watch,便于后面調用nodeLister.List()列出所有的nodes。// 該Watch的實現,需要看下apiServer中的實現,后面會進行講解lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything())nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour)reflector.Run()// 結構在前面介紹過return &kubeletProvider{nodeLister: nodeLister,reflector: reflector,kubeletClient: kubeletClient,}, nil }

該過程會涉及到較多的技術點,比如apiServer中的watch實現,reflector的使用。這里不會進行細講,該文章主要是針對heapster的源碼實現,apiServer相關的實現后面會進行單獨輸出。

這里需要注意的是創建了ListWath,需要關注后面哪里用到了nodeLister.List()進行nodes的獲取。

創建后端服務

前面已經提到后端數據存儲會有兩處,一個是metricSink,另一個是influxdbSink。所以這里會涉及到兩個結構:

type MetricSink struct {// 鎖lock sync.Mutex// 長時間存儲metrics數據,默認時間是15minlongStoreMetrics []stringlongStoreDuration time.Duration// 短時間存儲metrics數據,默認時間是140sshortStoreDuration time.Duration// 短時存儲空間shortStore []*core.DataBatch// 長時存儲空間longStore []*multimetricStore }

該結構就是用于heapster API調用時獲取的數據源,這里會分為兩種數據存儲方式:長時存儲和短時存儲。所以集群越大時,heapster占用內存越多,需要考慮該問題如何處理或者優化。

type influxdbSink struct {// 連接后端influxDB數據庫的clientclient influxdb_common.InfluxdbClient// 鎖sync.RWMutexc influxdb_common.InfluxdbConfigdbExists bool }

這個就是我們配置的InfluxDB的結構,是我們真正的數據存儲后端。

開始介紹創建后端服務流程,從sinksFactory.BuildAll()接口直接入手。
路徑: heapster/metrics/sinks/factory.go

func (this *SinkFactory) BuildAll(uris flags.Uris, historicalUri string) (*metricsink.MetricSink, []core.DataSink, core.HistoricalSource) {result := make([]core.DataSink, 0, len(uris))var metric *metricsink.MetricSinkvar historical core.HistoricalSource// 根據傳入的"--sink"參數信息,進行build// 支持多后端數據存儲,會進行遍歷并創建for _, uri := range uris {// 關鍵接口sink, err := this.Build(uri)if err != nil {glog.Errorf("Failed to create sink: %v", err)continue}if uri.Key == "metric" {metric = sink.(*metricsink.MetricSink)}if uri.String() == historicalUri {if asHistSource, ok := sink.(core.AsHistoricalSource); ok {historical = asHistSource.Historical()} else {glog.Errorf("Sink type %q does not support being used for historical access", uri.Key)}}result = append(result, sink)}// 默認metricSink一定會創建if metric == nil {uri := flags.Uri{}uri.Set("metric")sink, err := this.Build(uri)if err == nil {result = append(result, sink)metric = sink.(*metricsink.MetricSink)} else {glog.Errorf("Error while creating metric sink: %v", err)}}if len(historicalUri) > 0 && historical == nil {glog.Errorf("Error while initializing historical access: unable to use sink %q as a historical source", historicalUri)}return metric, result, historical }

該接口流程比較簡單,就是對傳入參數進行判斷,然后調用this.Build()進行創建,這里只需要注意即使沒有配置metric,也會進行metricSink的創建。

func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) {switch uri.Key {。。。case "influxdb":return influxdb.CreateInfluxdbSink(&uri.Val)。。。case "metric":return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{core.MetricCpuUsageRate.MetricDescriptor.Name,core.MetricMemoryUsage.MetricDescriptor.Name}), nil。。。default:return nil, fmt.Errorf("Sink not recognized: %s", uri.Key)} }

influxdb的創建其實就是根據傳入的參數然后創建一個config結構,用于后面創建連接influxDB的client;
metric的創建其實就是初始化了一個MetricSink結構,需要注意的是傳入的第三個參數,因為這是用于指定哪些metrics需要進行長時間存儲,默認就是cpu/usage和memory/usage,因為這兩個參數用戶最為關心。
具體的創建接口就不在深入了,較為簡單。
到這里BuildAll()就結束了,至于返回值前面已經做過介紹,就不在累贅了。
其實沒那么簡單,還有一步:sinkManager的創建。
進入sinks.NewDataSinkManager()接口看下:

func NewDataSinkManager(sinks []core.DataSink, exportDataTimeout, stopTimeout time.Duration) (core.DataSink, error) {sinkHolders := []sinkHolder{}// 遍歷前面創建的sinkListfor _, sink := range sinks {// 為每個sink添加一個dataChannel和stopChannel// 用于獲取數據和stop信號sh := sinkHolder{sink: sink,dataBatchChannel: make(chan *core.DataBatch),stopChannel: make(chan bool),}sinkHolders = append(sinkHolders, sh)// 每個sink都會創建一個協程// 從dataChannel獲取數據,并調用sink.export()導出到后端數據庫go func(sh sinkHolder) {for {select {case data := <-sh.dataBatchChannel:export(sh.sink, data)case isStop := <-sh.stopChannel:glog.V(2).Infof("Stop received: %s", sh.sink.Name())if isStop {sh.sink.Stop()return}}}}(sh)}return &sinkManager{sinkHolders: sinkHolders,exportDataTimeout: exportDataTimeout,stopTimeout: stopTimeout,}, nil }

這里會為每個sink創建協程,等待數據的到來并最終將數據導入到對應的后端數據庫。
這里需要帶個問號,既然channel有一端在收,總得有地方會發送,這會在后面才會揭曉。

go協程 + channel的方式,是golang最常見的方式,確實便用。

創建數據Processors

因為cAdvisor返回的原始數據就包含了nodes和containers的相關數據,所以heapster需要創建各種processor,用于處理成不同類型的數據,比如pod, namespace, cluster,node。
還有些數據需要計算出速率,有些數據需要進行累加,不同類型擁有的metrics還不一樣等等情況。
看下源碼:

func main() {...// 計算namespace和cluster的metrics值時,下列數據需要進行累加求值metricsToAggregate := []string{core.MetricCpuUsageRate.Name,core.MetricMemoryUsage.Name,core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// 計算node的metrics值時,下列數據需要進行累加求值metricsToAggregateForNode := []string{core.MetricCpuRequest.Name,core.MetricCpuLimit.Name,core.MetricMemoryRequest.Name,core.MetricMemoryLimit.Name,}// RateMetricsMapping中的數據需要計算速率,比如cpu/usage_rate,network/rx_ratedataProcessors := []core.DataProcessor{// Convert cumulaties to rateprocessors.NewRateCalculator(core.RateMetricsMapping),}kubernetesUrl, err := getKubernetesAddress(argSources)if err != nil {glog.Fatalf("Failed to get kubernetes address: %v", err)}kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl)if err != nil {glog.Fatalf("Failed to get client config: %v", err)}kubeClient := kube_client.NewOrDie(kubeConfig)// 創建pod的ListWatch,用于從k8s server監聽pod變更podLister, err := getPodLister(kubeClient)if err != nil {glog.Fatalf("Failed to create podLister: %v", err)}// 創建node的ListWatch,用于從k8s server監聽node變更nodeLister, err := getNodeLister(kubeClient)if err != nil {glog.Fatalf("Failed to create nodeLister: %v", err)}// 該podBasedEnricher用于解析從sources獲取到的pod和container的metrics數據,// 然后對pod和container進行數據完善,比如添加labels.但這里還不會處理metricsValuepodBasedEnricher, err := processors.NewPodBasedEnricher(podLister)if err != nil {glog.Fatalf("Failed to create PodBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, podBasedEnricher)// 跟上面的podBasedEnricher同理,需要注意的是在append時有先后順序namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err)}dataProcessors = append(dataProcessors, namespaceBasedEnricher)// 這里的對象會對metricsValue進行處理,對應的數據進行累加求值dataProcessors = append(dataProcessors,processors.NewPodAggregator(),&processors.NamespaceAggregator{MetricsToAggregate: metricsToAggregate,},&processors.NodeAggregator{MetricsToAggregate: metricsToAggregateForNode,},&processors.ClusterAggregator{MetricsToAggregate: metricsToAggregate,})dataProcessors = append(dataProcessors, processors.NewRcAggregator())nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl)if err != nil {glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err)}dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)

Processors的功能基本就是這樣了,相對有點復雜,數據處理的樣式和類別較多。
各個對象的Process()方法就不進行一一介紹了,就是按照順序一個一個的填充core.DataBatch數據。有興趣的可以逐個看下,可以借鑒下實現的方式。

獲取源數據并存儲

前面的都是鋪墊,開始介紹heapster的關鍵實現,進行源數據的獲取,并導出到后端存儲。
先介紹相關結構:

type Manager interface {Start()Stop() }

Manager是需要實現Start和stop方法的接口。而真實創建的對象其實是realManager:

type realManager struct {// 數據源source core.MetricsSource// 數據處理對象processors []core.DataProcessor// 后端存儲對象sink core.DataSink// 每次scrape數據的時間間隔resolution time.Duration// 創建多個scrape協程時,需要sleep這點時間,防止異常scrapeOffset time.Duration// scrape 停止的管道stopChan chan struct{}// housekeepSemaphoreChan chan struct{}// 超時housekeepTimeout time.Duration }

關鍵的代碼如下:

manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution,manager.DefaultScrapeOffset, manager.DefaultMaxParallelism)if err != nil {glog.Fatalf("Failed to create main manager: %v", err)}manager.Start()

首先會根據前面創建的sourceManager, dataProcessors, sinkManager對象,再創建manager。
路徑: heapster/metrics/manager/manager.go

func NewManager(source core.MetricsSource, processors []core.DataProcessor, sink core.DataSink, resolution time.Duration,scrapeOffset time.Duration, maxParallelism int) (Manager, error) {manager := realManager{source: source,processors: processors,sink: sink,resolution: resolution,scrapeOffset: scrapeOffset,stopChan: make(chan struct{}),housekeepSemaphoreChan: make(chan struct{}, maxParallelism),housekeepTimeout: resolution / 2,}for i := 0; i < maxParallelism; i++ {manager.housekeepSemaphoreChan <- struct{}{}}return &manager, nil }

前面介紹了該關鍵結構readlManager,繼續進入manager.Start():

func (rm *realManager) Start() {go rm.Housekeep() }func (rm *realManager) Housekeep() {for {// Always try to get the newest metricsnow := time.Now()// 獲取數據的時間段,默認是1minstart := now.Truncate(rm.resolution)end := start.Add(rm.resolution)// 真正同步一次的時間間隔,默認是1min + 5stimeToNextSync := end.Add(rm.scrapeOffset).Sub(now)select {case <-time.After(timeToNextSync):rm.housekeep(start, end)case <-rm.stopChan:rm.sink.Stop()return}} }

繼續看rm.housekeep(start, end), 該接口就傳入了時間區間,其實cAdvisor就是支持時間區間來獲取metrics值。

func (rm *realManager) housekeep(start, end time.Time) {if !start.Before(end) {glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end)return}select {case <-rm.housekeepSemaphoreChan:// ok, good to gocase <-time.After(rm.housekeepTimeout):glog.Warningf("Spent too long waiting for housekeeping to start")return}go func(rm *realManager) {defer func() { rm.housekeepSemaphoreChan <- struct{}{} }()// 從sources獲取數據data := rm.source.ScrapeMetrics(start, end)// 遍歷processors,然后進行數據處理for _, p := range rm.processors {newData, err := process(p, data)if err == nil {data = newData} else {glog.Errorf("Error in processor: %v", err)return}}// 最終將數據導出到后端存儲rm.sink.ExportData(data)}(rm) }

邏輯比較簡單,會有三個關鍵:

  • 源數據獲取

  • 數據處理

  • 導出到后端

  • 先看下rm.source.ScrapeMetrics()接口實現.
    路徑: heapster/metrics/sources/manager.go

  • func (this *sourceManager) ScrapeMetrics(start, end time.Time) *DataBatch {// 調用了nodeLister.List()獲取最新的k8s nodes列表,再根據之前配置的kubelet端口等信息,返回sources// 在創建sourceProvider時,會創建node的ListWatch,所以這里nodeLister可使用list()sources := this.metricsSourceProvider.GetMetricsSources()responseChannel := make(chan *DataBatch)。。。// 遍歷各個source,然后創建協程獲取數據for _, source := range sources {go func(source MetricsSource, channel chan *DataBatch, start, end, timeoutTime time.Time, delayInMs int) {// scrape()接口其實就是調用了kubeletMetricsSource.ScrapeMetrics()// 每個node都會組成對應的kubeletMetricsSource// ScrapeMetrics()就是從cAdvisor中獲取監控信息,并進行了decodemetrics := scrape(source, start, end)...select {// 將獲取到的數據丟入responseChannel// 下面會用到case channel <- metrics:// passed the response correctly.returncase <-time.After(timeForResponse):glog.Warningf("Failed to send the response back %s", source)return}}(source, responseChannel, start, end, timeoutTime, delayMs)}response := DataBatch{Timestamp: end,MetricSets: map[string]*MetricSet{},}latencies := make([]int, 11)responseloop:for i := range sources {...select {// 獲取前面創建的協程得到的數據case dataBatch := <-responseChannel:if dataBatch != nil {for key, value := range dataBatch.MetricSets {response.MetricSets[key] = value}}。。。case <-time.After(timeoutTime.Sub(now)):glog.Warningf("Failed to get all responses in time (got %d/%d)", i, len(sources))break responseloop}}...return &response }

    該接口的邏輯就是先通過nodeLister獲取k8s所有的nodes,這樣便能知道所有的kubelet信息,然后創建對應數量的協程從各個kubelet中獲取對應的cAdvisor監控信息,進行處理后再返回。

  • 獲取到數據后,就需要調用各個processors的Process()接口進行數據處理,接口太多就不一一介紹了,挑個node_aggregator.go進行介紹:

  • func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) {for key, metricSet := range batch.MetricSets {// 判斷下該metric是否是pod的// metricSet.Labels都是前面就進行了填充,所以前面說需要注意每個processor的append順序if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod {// Aggregating podsnodeName, found := metricSet.Labels[core.LabelNodename.Key]if nodeName == "" {glog.V(8).Infof("Skipping pod %s: no node info", key)continue}if found {// 獲取nodeKey,比如: node:172.25.5.111nodeKey := core.NodeKey(nodeName)// 前面都是判斷該pod在哪個node上,然后該node的數據是需要通過這些pod進行累加得到node, found := batch.MetricSets[nodeKey]if !found {glog.V(1).Info("No metric for node %s, cannot perform node level aggregation.")} else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil {return nil, err}} else {glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels)}}}return batch, nil }

    基本流程就是這樣了,有需要的可以各個深入查看。

  • 最后就是數據的后端存儲。
    這里會涉及到兩部分:metricSink和influxdbSink。

  • 從rm.sink.ExportData(data)接口入手:
    路徑: heapster/metrics/sinks/manager.go

    func (this *sinkManager) ExportData(data *core.DataBatch) {var wg sync.WaitGroup// 遍歷所有的sink,這里其實就兩個for _, sh := range this.sinkHolders {wg.Add(1)// 創建協程,然后將之前獲取的data丟入dataBatchChannelgo func(sh sinkHolder, wg *sync.WaitGroup) {defer wg.Done()glog.V(2).Infof("Pushing data to: %s", sh.sink.Name())select {case sh.dataBatchChannel <- data:glog.V(2).Infof("Data push completed: %s", sh.sink.Name())// everything okcase <-time.After(this.exportDataTimeout):glog.Warningf("Failed to push data to sink: %s", sh.sink.Name())}}(sh, &wg)}// Wait for all pushes to complete or timeout.wg.Wait() }

    千辛萬苦,你把數據丟入sh.dataBatchChannel完事了?
    dataBatchChannel有點眼熟,因為之前創建sinkManager的時候,也創建了協程并監聽了該管道,所以真正export數據是在之前就完成了,這里只需要把數據丟入管道即可。
    所以golang中協程與協程之間的通信,channel才是王道啊!
    ExportData有兩個,一個一個講吧。
    先來關鍵的influxDB.
    路徑: heapster/metrics/sinks/influxdb/influxdb.go

    func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) {...dataPoints := make([]influxdb.Point, 0, 0)for _, metricSet := range dataBatch.MetricSets {// 遍歷MetricValuesfor metricName, metricValue := range metricSet.MetricValues {var value interface{}if core.ValueInt64 == metricValue.ValueType {value = metricValue.IntValue} else if core.ValueFloat == metricValue.ValueType {value = float64(metricValue.FloatValue)} else {continue}// Prepare measurement without fieldsfieldName := "value"measurementName := metricNameif sink.c.WithFields {// Prepare measurement and field namesserieName := strings.SplitN(metricName, "/", 2)measurementName = serieName[0]if len(serieName) > 1 {fieldName = serieName[1]}}// influxdb單條數據結構point := influxdb.Point{// 度量值名稱,比如cpu/usageMeasurement: measurementName,// 該tags就是在processors中進行添加,主要是pod_name,node_name,namespace_name等Tags: metricSet.Labels,// 該字段就是具體的值了Fields: map[string]interface{}{fieldName: value,},// 時間戳Time: dataBatch.Timestamp.UTC(),}// append到dataPoints,超過maxSendBatchSize數量后直接sendData到influxdbdataPoints = append(dataPoints, point)if len(dataPoints) >= maxSendBatchSize {sink.sendData(dataPoints)dataPoints = make([]influxdb.Point, 0, 0)}}// 遍歷LabeledMetrics,主要就是filesystem的數據// 不太明白為何要將filesystem的數據進行區分,要放到Labeled中?什么意圖?望高手指點,謝謝// 接下來的操作就跟上面MetricValues的操作差不多了for _, labeledMetric := range metricSet.LabeledMetrics {。。。point := influxdb.Point{Measurement: measurementName,Tags: make(map[string]string),Fields: map[string]interface{}{fieldName: value,},Time: dataBatch.Timestamp.UTC(),}for key, value := range metricSet.Labels {point.Tags[key] = value}for key, value := range labeledMetric.Labels {point.Tags[key] = value}dataPoints = append(dataPoints, point)if len(dataPoints) >= maxSendBatchSize {sink.sendData(dataPoints)dataPoints = make([]influxdb.Point, 0, 0)}}}if len(dataPoints) >= 0 {sink.sendData(dataPoints)} }

    該接口中有一處不太明白,metricSet中的LabeledMetrics和MetricsValue有何差別,為何要將filesystem的數據進行區分對待,放入LabeldMetrics?
    看代碼的過程中沒有得到答案,望大神指點迷津,多謝多謝!

    有問題,但也不影響繼續往下學習,接著看下MetricSink:

    func (this *MetricSink) ExportData(batch *core.DataBatch) {this.lock.Lock()defer this.lock.Unlock()now := time.Now()// 將數據丟入longStore和shortStore// 需要根據保存的時間將老數據丟棄this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)),buildMultimetricStore(this.longStoreMetrics, batch))this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch) }

    該邏輯比較簡單,就是將數據丟入兩個Store中,然后把過期數據丟棄。
    這里提醒一點,heapster API調用時先會從longStore中匹配數據,沒匹配上的話再從shortStore獲取,而longStore中存儲的數據類型前面已經做過介紹。

    終于結束了。。

    Heapster API創建

    前面的主流業務都介紹完了,Heapster本身也提供了API用于開發者進行使用與測試。
    繼續分析代碼吧:

    // 關鍵接口,后面分析handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource)。。。// 創建http的mux多分器,用于http.Server的路由mux := http.NewServeMux()// prometheus:最新出現的人氣很高的監控系統,值得了解學習下,后續安排!promHandler := prometheus.Handler()// 支持TLS,我們用了httpif len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 {。。。} else {// 多分器分了"/"和"/metrics"// 進入"/",還會進行細分,里面使用到了go-restfulmux.Handle("/", handler)mux.Handle("/metrics", promHandler)// 注冊健康檢測接口healthz.InstallHandler(mux, healthzChecker(metricSink))// 啟動Serverglog.Fatal(http.ListenAndServe(addr, mux))}

    這里的關鍵是setupHandlers()接口,需要學習下里面如何使用go-restful進行請求路由的。

    k8s apiServer中也大量使用了go-restful,在學習該源碼時有進行過分析

    路徑: heapster/metrics/handlers.go

    func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler {runningInKubernetes := true// 創建container,指定route類型為CurlyRouter// 這些都跟go-restful基礎有關,有興趣的可以看下原理wsContainer := restful.NewContainer()wsContainer.EnableContentEncoding(true)wsContainer.Router(restful.CurlyRouter{})// 注冊v1版本相關的api,包括官方介紹的"/api/v1/model"a := v1.NewApi(runningInKubernetes, metricSink, historicalSource)a.Register(wsContainer)// 這個metricsApi注冊了"/apis/metrics/v1alpha1"的各類命令// 暫不關心m := metricsApi.NewApi(metricSink, podLister, nodeLister)m.Register(wsContainer)handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) {name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath)switch name {case "profile":pprof.Profile(resp, req.Request)case "symbol":pprof.Symbol(resp, req.Request)case "cmdline":pprof.Cmdline(resp, req.Request)default:pprof.Index(resp, req.Request)}}// Setup pporf handlers.ws = new(restful.WebService).Path(pprofBasePath)ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint")wsContainer.Add(ws)return wsContainer }

    關鍵在于v1版本的API注冊,繼續深入a.Register(wsContainer):

    func (a *Api) Register(container *restful.Container) {// 注冊"/api/v1/metric-export" API// 用于從shortStore中獲取所有的metrics信息ws := new(restful.WebService)ws.Path("/api/v1/metric-export").Doc("Exports the latest point for all Heapster metrics").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetrics).Doc("export the latest data point for all metrics").Operation("exportMetrics").Writes([]*types.Timeseries{}))// ws必須要add到container中才能生效container.Add(ws)// 注冊"/api/v1/metric-export-schema" API// 用于導出所有的metrics name,比如network-rx// 還會導出還有的labels,比如pod-namews = new(restful.WebService)ws.Path("/api/v1/metric-export-schema").Doc("Schema for metrics exported by heapster").Produces(restful.MIME_JSON)ws.Route(ws.GET("").To(a.exportMetricsSchema).Doc("export the schema for all metrics").Operation("exportmetricsSchema").Writes(types.TimeseriesSchema{}))container.Add(ws)// 注冊metircSink相關的API,即"/api/v1/model/"if a.metricSink != nil {glog.Infof("Starting to Register Model.")a.RegisterModel(container)}if a.historicalSource != nil {a.RegisterHistorical(container)} }

    官方資料中介紹heapster metric model,我們使用到這些API也會比較多。
    進入a.RegisterModel(container)看下:

    func (a *Api) RegisterModel(container *restful.Container) {ws := new(restful.WebService)// 指定所有命令的prefix: "/api/v1/model"ws.Path("/api/v1/model").Doc("Root endpoint of the stats model").Consumes("*/*").Produces(restful.MIME_JSON)// 在這里增加各類命令,比如"/metrics/,/nodes/"等等addClusterMetricsRoutes(a, ws)// 列出所有的keysws.Route(ws.GET("/debug/allkeys").To(metrics.InstrumentRouteFunc("debugAllKeys", a.allKeys)).Doc("Get keys of all metric sets available").Operation("debugAllKeys"))container.Add(ws) }

    繼續看addClusterMetricsRoutes():

    func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) {。。。if a.isRunningInKubernetes() {// 列出所有namespaces的APIws.Route(ws.GET("/namespaces/").To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)).Doc("Get a list of all namespaces that have some current metrics").Operation("namespaceList"))// 獲取指定namespaces的metricsws.Route(ws.GET("/namespaces/{namespace-name}/metrics").To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)).Doc("Get a list of all available metrics for a Namespace entity").Operation("availableNamespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")))// 獲取namespace指定的metrics值ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}").To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)).Doc("Export an aggregated namespace-level metric").Operation("namespaceMetrics").Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")).Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")).Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")).Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")).Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")).Writes(types.MetricResult{}))。。。}。。。 }

    Heapster API的注冊基本就這樣了,在花點時間看下API的實現吧。
    我們挑一個例子做下分析,獲取某個pod的指定的metrics值.
    對應的接口:heapster/metrics/api/v1/model_handler.go

    func (a *Api) podMetrics(request *restful.Request, response *restful.Response) {a.processMetricRequest(// 根據URI傳入的ns和pod名字,拼裝成key,如:"namespace:default/pod:123"core.PodKey(request.PathParameter("namespace-name"),request.PathParameter("pod-name")),request, response) }

    根據URI的輸入參數并調用processMetricRequest()接口,獲取對應的metric value:

    func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) {// 時間區間start, end, err := getStartEndTime(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}// 獲取metric Name,比如"/cpu/usage"metricName := request.PathParameter("metric-name")// 根據metricName進行轉換,比如將cpu-usage轉換成cpu/usage_rate// 所以這里需要注意cpu-usage不等于/cpu/usage,一個表示cpu使用率,一個表示cpu使用量convertedMetricName := convertMetricName(metricName)// 獲取請求中的labels,根據是否有指定labels來調用不同的接口labels, err := getLabels(request)if err != nil {response.WriteError(http.StatusBadRequest, err)return}var metrics map[string][]core.TimestampedMetricValueif labels != nil {// 該接口從metricSet.LabeledMetrics中獲取對應的valuemetrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end)} else {// 該接口先從longStoreMetrics中進行匹配,匹配不到的話再從shortStore中獲取對應的metricValuemetrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end)}// 將獲取到的metricValue轉換成MetricPoint格式的值,會有多組"時間戳+value"converted := exportTimestampedMetricValue(metrics[key])// 將結果進行responseresponse.WriteEntity(converted) }

    OK,大功告成!API的實現也講完了,很多API都是相通的,最終都會調用相同的接口,所以不一一介紹了。
    這里需要注意heapster的API的URI還有多種寫法,比如/api/v1/model/cpu-usage,等價于/api/v1/model/cpu/usage_rate/,別誤理解成/cpu/usage了,這兩個概念不一樣,一個是cpu使用率,一個是cpu使用量。

    上面的提醒告訴我們,沒事多看源碼,很多誤解自然而然就解除了!

    筆者能力有限,看源碼也在于學習提升能力,當然也會有較多不理解或者理解不當的地方,希望各位能予以矯正,多謝多謝!

    擴展

    上面的介紹完了Heapster的實現,我們可以思考下是否可以動手修改源碼,比如增加一些對象的metrics信息。
    筆者考慮是否可以直接支持RC/RS/Deployment的metrics信息,讓業務層可以直接拿到服務的整體信息。

    參考資料

  • Heapster官方資料:https://github.com/kubernetes...

  • InfluxDB github: https://github.com/influxdata...

  • 超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Kubernetes监控之Heapster源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    97天天综合网 | 四虎在线免费 | 久久曰视频 | 9在线观看免费高清完整 | 久久精品高清视频 | 中文av日韩 | 天堂激情网 | 亚洲第一区在线观看 | 五月开心婷婷网 | 91在线看视频免费 | 日韩伦理片hd | 九九精品久久 | 欧美一级片在线免费观看 | 经典三级一区 | 91av蜜桃 | 日韩免费电影在线观看 | 97视频在线观看网址 | 2019天天干夜夜操 | 欧美黄色软件 | 亚洲桃花综合 | 免费成人av网站 | 九九九视频在线 | 最新av网站在线观看 | 久草视频中文 | 成人网中文字幕 | 亚洲涩综合 | 干干干操操操 | 国产在线精品一区二区不卡了 | 欧美精品国产综合久久 | 91社区国产高清 | 亚洲另类视频在线 | 中文字幕色在线视频 | 欧美日韩视频一区二区三区 | 天天综合天天综合 | 国产一区在线播放 | 91免费观看视频网站 | 9在线观看免费高清完整版在线观看明 | 波多野结衣动态图 | 日韩精品在线看 | 在线观看中文字幕第一页 | 久久韩国免费视频 | 狠狠色狠狠色综合日日小说 | 国产精品99久久久久久宅男 | 久久刺激视频 | 成人试看120秒 | 国产男男gay做爰 | 亚洲精品在线视频观看 | 午夜神马福利 | 精品99在线| 国产精品久久久久永久免费观看 | 伊人亚洲综合 | 国产美女黄网站免费 | 97精品久久人人爽人人爽 | 高清av免费看 | 国产欧美高清 | 久久精品人 | 黄色av电影一级片 | 一区二区三区在线免费观看视频 | 中文资源在线官网 | 日韩字幕在线观看 | 黄色不卡av | 91免费看黄色 | 91污视频在线观看 | 久久精视频 | 午夜精品导航 | 国产精品免费久久久久 | 日韩精品一区二区三区水蜜桃 | 久久任你操 | 免费观看高清 | 最新色站 | 亚州精品天堂中文字幕 | 国产黄色片一级三级 | 天天操天天舔天天爽 | 国产精品嫩草69影院 | 人人干人人做 | 免费精品在线观看 | 国产亚洲成人网 | 国产午夜一区二区 | 亚洲国产精品小视频 | 亚洲精品久久久蜜桃直播 | 天天天色 | 久草视频精品 | 国产亚洲精品久久久久久无几年桃 | 91在线观看高清 | 日韩免费在线 | 亚洲精品美女在线观看播放 | 欧美成人va| 欧美日韩国产一区二 | www.91av在线 | 香蕉一区 | 成年人黄色免费网站 | 日韩在线中文字幕视频 | 亚洲精品tv| 亚洲女欲精品久久久久久久18 | 人人干人人爽 | 97超碰人人澡人人爱学生 | 狠狠狠狠狠狠干 | 久草视频在线新免费 | 草久在线观看 | 精品国产乱码 | 天天激情综合网 | 99免在线观看免费视频高清 | 在线成人av | 国产四虎在线 | 国产.精品.日韩.另类.中文.在线.播放 | 日韩精品免费在线观看视频 | 天天天干天天射天天天操 | 一区二区三区高清在线观看 | 免费在线观看av网站 | 亚州国产精品 | 99综合久久 | 热re99久久精品国产99热 | 美女国产在线 | 久久噜噜少妇网站 | 国产超碰在线观看 | 中文字幕免费成人 | 男女拍拍免费视频 | 日本韩国精品在线 | 狠狠色婷婷丁香六月 | 激情五月亚洲 | 在线成人欧美 | 狠狠操天天干 | 欧美做受高潮1 | 一区二区视 | 色婷婷视频在线 | 国产午夜在线观看视频 | 久久国产精品小视频 | 黄色成人av | 国产精品色婷婷视频 | 曰本免费av| av免费电影网站 | 91视频啊啊啊| 婷婷社区五月天 | 97超碰色偷偷 | 奇米影视999 | 九色琪琪久久综合网天天 | 日韩视频在线不卡 | 97色狠狠 | 一级一片免费观看 | 亚洲精品小视频在线观看 | 可以免费观看的av片 | 国产女人18毛片水真多18精品 | 手机成人av在线 | 国产精品自产拍在线观看蜜 | 欧美精品亚洲精品日韩精品 | 天天色天天干天天色 | 午夜国产福利视频 | 色视频在线观看 | 日av免费| 园产精品久久久久久久7电影 | 天天天干 | 午夜影视一区 | 色视频在线免费观看 | 精品国产激情 | 欧美日韩精品久久久 | 亚洲jizzjizz日本少妇 | 又爽又黄在线观看 | 成人全视频免费观看在线看 | 夜夜爽天天爽 | 国产在线精品一区二区不卡了 | 国内精品久久久久久久影视麻豆 | 五月天六月丁香 | 婷婷久久五月 | 亚洲性少妇性猛交wwww乱大交 | 色综合久久久久综合99 | 国内精品久久久久久久影视麻豆 | 国产精品观看 | 午夜精品一区二区三区在线观看 | 最近免费中文字幕 | 啪啪肉肉污av国网站 | 国产亚洲精品成人av久久影院 | 九九九视频精品 | 日韩精品中文字幕在线观看 | 色综合 久久精品 | 九草视频在线观看 | 中日韩在线视频 | 欧美精品在线一区二区 | 欧美精品一区二区免费 | 国产一区福利在线 | 黄色.com| 精品久久久久久综合日本 | 黄色片网站大全 | 成人免费看片网址 | 四虎在线永久免费观看 | av观看在线观看 | 久久综合久久八八 | 久久电影色| 亚洲高清视频在线观看 | 97超碰在线免费观看 | 夜添久久精品亚洲国产精品 | 亚洲日本一区二区在线 | 四虎成人精品永久免费av | 人人看看人人 | 亚洲日本激情 | 精品人人爽 | 国产精品综合久久久久久 | 91片黄在线观看动漫 | 欧美日一级片 | 亚洲欧美综合精品久久成人 | 六月天综合网 | 日韩精品免费一区二区三区 | 国产免费午夜 | 不卡视频国产 | ww亚洲ww亚在线观看 | 色丁香综合| 久久艹人人 | 丁香花在线观看免费完整版视频 | 99久久精品免费视频 | 色资源网在线观看 | 一区二区三区久久精品 | 欧美a视频 | 亚洲精品在线视频播放 | 国产人成看黄久久久久久久久 | 国产精品国产三级国产不产一地 | 男女啪啪视屏 | 久久成人国产精品一区二区 | 91亚洲精品在线观看 | 欧美 日韩 性 | 亚洲h色精品 | 一区二区国产精品 | 成人动漫视频在线 | 久久久三级视频 | 欧美日韩一级视频 | 午夜久久久影院 | 久久综合久久八八 | 天天·日日日干 | 亚洲激情| 精品中文字幕在线播放 | 国产伦精品一区二区三区在线 | 免费日韩一区 | 亚洲久草网 | 97超碰中文字幕 | 欧美一级裸体视频 | 国产免费又粗又猛又爽 | 国产视频导航 | 国内精品久久久精品电影院 | 91黄色小视频 | 国产精品国产三级国产不产一地 | 丁香五月亚洲综合在线 | 久久综合狠狠综合久久狠狠色综合 | 97视频总站 | 国产精品精品国产 | 久久精品99久久 | 国产一区视频在线 | 日韩黄色网络 | 亚洲国产三级在线观看 | 91av手机在线观看 | 在线视频 一区二区 | 91精品国产自产在线观看 | 久久精品视频18 | 91日韩在线视频 | 久久久亚洲网站 | 激情五月播播久久久精品 | 久久线视频 | 波多野结衣综合网 | 国产精品第一页在线观看 | 天天插日日操 | 国产精品一区二区久久精品爱涩 | 免费观看久久 | a在线一区 | 久久激情小视频 | 国产精品国产三级国产不产一地 | 热re99久久精品国产66热 | 日免费视频 | 人人澡超碰碰97碰碰碰软件 | 狠狠色狠狠色终合网 | 97电影网手机版 | av3级在线 | 人人爱人人爽 | 亚洲aⅴ一区二区三区 | 正在播放国产一区 | 精品成人网 | 日韩成人av在线 | 一本一道久久a久久综合蜜桃 | 国产剧情在线一区 | 五月的婷婷 | 欧美日韩国产高清视频 | av福利第一导航 | 精品国产三级a∨在线欧美 免费一级片在线观看 | 四虎在线观看网址 | 国内精品久久久久影院一蜜桃 | 久久久久成| 国产区第一页 | 欧美在线你懂的 | 亚洲天堂网视频 | 99久久婷婷国产 | 免费亚洲视频 | 人人澡人人模 | 激情欧美在线观看 | 日韩电影在线一区二区 | 四虎最新域名 | 人人插人人爱 | 国产精品激情在线观看 | 国产亚洲精品久 | 久久亚洲综合国产精品99麻豆的功能介绍 | 男女视频久久久 | 在线视频专区 | 亚洲精品成人av在线 | 欧美在线日韩在线 | 国产精品6999成人免费视频 | 日韩免费视频在线观看 | 日韩午夜av电影 | 国产福利精品在线观看 | 中文字幕视频网站 | 婷婷四房综合激情五月 | 成人毛片一区二区三区 | 91女子私密保健养生少妇 | www九九热| 日本久久久久久久久久 | 欧美一级免费在线 | 少妇搡bbbb搡bbb搡aa | 天天激情在线 | 中文字幕一区二区三 | 久久综合婷婷 | 久久成人一区二区 | 久久精品国产第一区二区三区 | 欧美久草视频 | 亚洲成人网在线 | 在线高清av | 中文字幕第一 | 日黄网站 | 色偷偷888欧美精品久久久 | 在线视频观看91 | 国产亚洲视频系列 | 国产激情电影综合在线看 | 久久久国产一区二区三区四区小说 | 91av电影| 激情电影影院 | 欧美伊人网 | 97免费在线视频 | 国产精品黄色影片导航在线观看 | 成人国产网站 | www.天天色| 国产精品美女免费看 | 欧美另类亚洲 | 日本成人黄色片 | www.天天射.com| 国产一级大片在线观看 | 久久久午夜精品理论片中文字幕 | 久久毛片高清国产 | 国产精品久久久久亚洲影视 | 欧美日韩一区久久 | 在线免费av网站 | 最近日本韩国中文字幕 | 免费黄色激情视频 | 国产视频在线免费观看 | 99久久夜色精品国产亚洲 | 久久精品一二三区 | 久久艹欧美 | 久久99偷拍视频 | 成人影片在线免费观看 | 免费一级片在线观看 | 久久国内精品99久久6app | 久久精品在线 | 久久国色夜色精品国产 | 午夜一级免费电影 | 夜夜视频资源 | 玖草在线观看 | av在线播放中文字幕 | 国产视频一区二区在线 | 久久夜色精品国产欧美乱极品 | 日韩av资源在线观看 | 国产成人精品在线播放 | 久久伊人热 | 久久久 激情 | 97av.com| 一区二区三区电影 | 午夜精品久久久久 | 亚洲作爱 | 久久免费成人网 | 五月婷婷影视 | 亚洲午夜精品久久久久久久久久久久 | 亚洲天堂网在线视频 | 超级av在线 | 久久激情影院 | 成人黄性视频 | 日韩色高清 | 香蕉视频在线网站 | 激情综合亚洲精品 | 日夜夜精品视频 | 日本精品视频在线播放 | 中文字幕日韩国产 | 久久精品永久免费 | 四虎最新入口 | 日韩试看 | 一区二区精品在线视频 | 国产永久免费高清在线观看视频 | 99国产在线视频 | 黄色网中文字幕 | 五月婷婷开心 | 偷拍区另类综合在线 | 精品久久五月天 | 亚洲性少妇性猛交wwww乱大交 | 久久久99精品免费观看 | 国产91粉嫩白浆在线观看 | 国产视频1 | 国产 日韩 欧美 中文 在线播放 | 狠狠色噜噜狠狠狠狠2022 | 狠狠网站 | 午夜精品99久久免费 | 国产精品爽爽久久久久久蜜臀 | www.伊人色.com | 色香com. | 亚洲男男gⅴgay双龙 | 欧美视频不卡 | 四虎小视频 | 黄色国产在线 | 亚洲第一av在线播放 | 亚洲一区二区麻豆 | 午夜性生活 | 久久久久久久久久免费视频 | 色婷婷 亚洲 | 99热精品在线观看 | 成年人免费电影 | 黄色片网站免费 | 成人免费视频免费观看 | 一级性av | 九九导航 | 亚洲婷婷综合色高清在线 | 天天干天天操天天做 | 天天干 夜夜操 | 国产精品久久久久久久电影 | 色是在线视频 | 7777xxxx| 91精品免费在线观看 | 亚洲狠狠丁香婷婷综合久久久 | 91在线色| 婷久久 | 亚洲黄色激情小说 | 中文字幕一区三区 | 国产成人精品网站 | 日韩久久精品一区 | 国产精品美女网站 | 国产精品白丝av | 欧美日韩一区二区三区在线观看视频 | 91精品免费在线视频 | 成人一级黄色片 | 亚洲一区二区91 | 亚洲精品欧洲精品 | 麻豆精品视频在线观看免费 | 亚洲一区视频免费观看 | 日韩三级免费观看 | 亚洲欧美国产精品 | 国产成人在线精品 | 天天爱天天舔 | 久久成人免费 | 波多野结衣日韩 | 欧美日韩国产区 | 日韩欧美高清一区二区 | 人人爽人人舔 | 欧美日韩高清在线 | 黄色a在线观看 | 91精品国自产在线 | 久久国产精品一区二区三区 | 午夜av在线电影 | 欧美国产日韩一区二区三区 | 国产亚洲久一区二区 | 亚洲国产成人久久综合 | 久久一精品 | 在线成人高清电影 | 蜜桃视频在线观看一区 | 久草在线一免费新视频 | www.色com| 亚洲视频精品 | 亚洲精品一区二区在线观看 | av免费观看高清 | 久久久人人人 | 在线观看国产福利片 | 99热999| 国产第一福利 | 人人爽人人爽人人片av免 | www.久久久久 | 激情网站网址 | 在线免费日韩 | 亚洲永久精品视频 | 黄色国产区 | 人人看人人做人人澡 | 啪啪激情网 | 四虎影视精品成人 | 九九色综合 | 人人干人人超 | 日本字幕网 | 国产精品一区久久久久 | 欧美另类激情 | 91精品一区国产高清在线gif | 成人欧美日韩国产 | .国产精品成人自产拍在线观看6 | 精品亚洲免费 | 国产精品理论片在线播放 | 丰满少妇在线观看 | .国产精品成人自产拍在线观看6 | 国产亚洲亚洲 | 夜又临在线观看 | 国产精品国产三级国产aⅴ9色 | 五月花婷婷 | 欧美一区二区三区不卡 | 中文字幕一区二区三区乱码在线 | 天天操天天操一操 | 日韩精品无码一区二区三区 | 日韩国产精品一区 | 中文字幕精品在线 | 香蕉在线播放 | 看片网站黄| 婷婷精品进入 | 人人射人人澡 | 亚洲国产精品久久久久久 | 国产精品嫩草55av | 国产视频九色蝌蚪 | 精品a在线 | 在线看片中文字幕 | 奇米影视999 | 欧美日韩精 | 成人在线视频在线观看 | 国产精品中文在线 | 国产成人一区二区三区电影 | 中文字幕在线观看视频免费 | 国产精品网红直播 | 亚洲 成人 欧美 | 国产精品免费久久久久久 | 成人国产精品入口 | 天天操天天射天天爱 | 奇米导航 | 五月天中文在线 | 一区 在线 影院 | 欧美aaa一级 | av免费在线观看1 | 亚洲综合爱 | 69成人在线 | 韩日精品在线观看 | 在线播放日韩 | 日韩在线第一区 | 日本精品免费看 | 成人久久18免费网站麻豆 | 成人午夜电影在线 | 国产精品11 | www在线观看国产 | 日韩黄色在线电影 | 色婷婷亚洲 | 伊人超碰在线 | 亚洲欧洲xxxx | 九九九热| 91av福利视频 | 日韩av免费一区 | 亚洲电影在线看 | a视频免费看 | 日韩欧美v | 一区二区三区视频在线 | 国产xxxx做受性欧美88 | 亚洲午夜精品在线观看 | 亚洲婷婷综合色高清在线 | 超碰人人在 | 国产精品女同一区二区三区久久夜 | 久久综合九色99 | 久久国产露脸精品国产 | 深爱激情综合网 | 久久这里有 | 91片网| av免费看在线 | 久久在线免费观看视频 | 97看片| 国产视频不卡 | 天天玩天天干天天操 | 精品成人久久 | 亚洲成aⅴ人片久久青草影院 | 毛片无卡免费无播放器 | 国产亚洲欧美日韩高清 | 成人精品电影 | 久久久九色精品国产一区二区三区 | 精品二区视频 | 又湿又紧又大又爽a视频国产 | www.天天射.com| 五月开心激情网 | 久久超碰网 | 超碰人人在线观看 | 韩国av一区二区三区 | 五月婷婷综合激情 | 91精品国自产拍天天拍 | 日本精品久久久久影院 | 精品国产欧美一区二区 | 久久99久久99免费视频 | 亚洲第一区在线播放 | 国产一区在线视频观看 | 日本久久久亚洲精品 | 亚洲四虎在线 | 91麻豆精品久久久久久 | 亚洲成人资源在线观看 | 国产精品美女久久久久久久 | 日韩午夜视频在线观看 | 99在线热播精品免费99热 | 亚洲精区二区三区四区麻豆 | 欧产日产国产69 | 婷婷精品国产一区二区三区日韩 | 91在线免费观看国产 | 亚洲日本在线视频观看 | 久久草网站 | 成片人卡1卡2卡3手机免费看 | 日韩在线观看一区二区三区 | 亚洲91在线| 欧美一级性生活视频 | 亚洲精品乱码久久久久久蜜桃不爽 | 欧美日韩一二三四区 | 日日综合 | 亚洲午夜精品福利 | 日韩一区正在播放 | 麻豆视频91 | 欧美精品三级在线观看 | 青青草国产精品视频 | 友田真希av| 精品在线观看一区二区 | 夜夜嗨av色一区二区不卡 | 99精品国产福利在线观看免费 | 久久综合五月天 | 国产一级免费在线观看 | 国产理伦在线 | 91av播放| 在线免费观看麻豆 | 亚洲精品自在在线观看 | 欧美少妇xxxxxx| 色偷偷88欧美精品久久久 | av看片在线 | 国产在线综合视频 | 亚洲 综合 精品 | 伊人天堂网 | 国产亚洲激情视频在线 | 国产xxxxx在线观看 | 91丨九色丨国产丨porny精品 | 人人玩人人添人人澡超碰 | 就操操久久 | 成人在线观看免费 | 亚洲专区欧美 | 日日夜夜免费精品 | 99精品在线免费观看 | 麻豆影视网站 | 久久国产高清视频 | 区一区二区三在线观看 | 日韩电影在线观看一区二区三区 | 在线免费黄色片 | 激情综合色综合久久 | 久久国产精品99久久久久 | 日韩手机在线 | 久久歪歪 | .国产精品成人自产拍在线观看6 | 欧洲成人av | 综合激情婷婷 | 在线看小早川怜子av | 国产精品一区二区久久久 | 日韩中文字幕视频在线 | 在线岛国av| 国产人在线成免费视频 | 欧美成人在线免费 | 九九九热 | 天堂av在线中文在线 | 亚洲免费国产视频 | 日韩视频在线一区 | 亚洲男模gay裸体gay | 91在线免费看片 | 日日夜夜草 | 精品国产一区二区三区久久久蜜臀 | 91黄色影视| 久久精品香蕉视频 | 91网站在线视频 | a天堂最新版中文在线地址 久久99久久精品国产 | 在线视频欧美精品 | 欧美日韩一区二区久久 | 亚洲在线视频免费观看 | 色婷婷狠狠五月综合天色拍 | 一级免费黄色 | 久久精品a| 超碰av在线播放 | 久久成人免费 | 亚洲热视频| 不卡视频一区二区三区 | 亚洲精品成人网 | 香蕉97视频观看在线观看 | 在线国产福利 | 日本91在线 | 国产成人精品久久亚洲高清不卡 | 精品久久久久国产免费第一页 | 成人av网址大全 | 精品视频专区 | 99久久精品国产网站 | 波多野结衣视频在线 | 日韩一区二区三区在线观看 | 在线91精品| 美女视频a美女大全免费下载蜜臀 | 91精品视频观看 | 日韩高清av | 国产视频九色蝌蚪 | 麻豆精品传媒视频 | 丁香五月亚洲综合在线 | 精品视频国产 | 色狠狠综合天天综合综合 | 精品久久久久一区二区国产 | 日韩在线观看免费 | 在线观看视频中文字幕 | 国产中文字幕在线视频 | 久久精品成人欧美大片古装 | 久久九九久久精品 | 国产一区二区免费 | 欧美日一级片 | 欧美九九视频 | 久久久久久久久黄色 | 尤物九九久久国产精品的分类 | 国产精品激情在线观看 | 亚洲视频专区在线 | 日本公乱妇视频 | 日日夜夜91 | 99视频在线免费观看 | 国产精品久久久久久久午夜 | 国产不卡在线观看 | 国产精品视频在线看 | 久久婷五月 | av一级在线 | 4438全国亚洲精品在线观看视频 | 黄色三级免费 | 黄色大片av | 日本黄色大片儿 | 亚洲国产精品视频在线观看 | 色香蕉视频 | 激情婷婷久久 | 国产色视频123区 | 在线免费观看黄色 | 欧美在线观看视频 | 色偷偷88888欧美精品久久久 | 亚洲欧美成人综合 | 亚洲成人av电影在线 | 青草视频在线 | 美女黄网站视频免费 | 欧美精品亚州精品 | 日日夜夜人人天天 | 超碰免费观看 | 国产一级黄色电影 | 免费视频你懂的 | 奇米777777| 四虎影视精品 | 国产成人久久av977小说 | 高潮久久久 | 亚洲经典中文字幕 | 国产91综合一区在线观看 | 欧美有色| 国产极品尤物在线 | 99精品免费久久久久久久久日本 | 日本在线观看中文字幕 | 2018好看的中文在线观看 | 亚洲成成品网站 | 999久久| 久久新视频 | 在线导航av | 成人免费大片黄在线播放 | 久久这里 | 日韩欧美视频免费在线观看 | 天天色天天射天天干 | 中文字幕二区在线观看 | 91免费黄视频 | 四虎5151久久欧美毛片 | 在线观看中文字幕视频 | 96亚洲精品久久 | 99视频99 | 又黄又刺激视频 | 日韩av网址在线 | 99色在线视频 | 国产专区欧美专区 | 欧美狠狠操 | 亚洲综合精品视频 | 视频一区二区国产 | 日韩特级片 | 国产精品久久久久久久av大片 | 欧美日本中文字幕 | 亚洲欧美一区二区三区孕妇写真 | 久久国产精品久久精品 | 日韩视频一区二区三区在线播放免费观看 | 日韩在线视频国产 | 国产精品手机在线观看 | 欧美日韩国产一二 | 欧美天天综合网 | 成人黄色在线电影 | 中文字字幕在线 | 国产又粗又长的视频 | 日日噜噜噜噜夜夜爽亚洲精品 | 久久精品一区 | 国产区在线看 | 国产在线 一区二区三区 | 婷色| 亚洲性xxxx | av电影在线观看完整版一区二区 | 国产在线理论片 | 天天色欧美| 午夜手机电影 | 97精品欧美91久久久久久 | 96视频在线 | 日本精品视频在线观看 | 国产精品美女久久 | 亚洲精品久久视频 | 在线观看av黄色 | 四虎伊人| 久草免费新视频 | 久久婷婷开心 | 国产精品视频地址 | 五月香视频在线观看 | 久久天天综合网 | 99久久久国产精品美女 | 欧美性春潮 | 成人性生活大片 | 日韩一区二区三区视频在线 | 欧美最新另类人妖 | 亚洲 欧美 国产 va在线影院 | 日韩高清免费在线 | 久久久久久久久久久网站 | 国产精品九九视频 | 五月婷婷婷婷婷 | 成人小视频免费在线观看 | 日韩aⅴ视频 | 欧美一级片在线播放 | 欧美视屏一区二区 | 97精品国产97久久久久久粉红 | 欧美黑吊大战白妞欧美 | 91中文字幕一区 | 欧美一级电影免费观看 | 日本久久免费电影 | av色影院 | 九九热免费精品视频 | 国产麻豆剧传媒免费观看 | 久久在线影院 | 六月丁香婷婷久久 | 久久久五月婷婷 | 久久久久亚洲精品成人网小说 | 超碰人在线| 五月婷婷亚洲 | 91在线看免费 | 国产黄免费 | 久久久影院官网 | 久久综合九色综合97婷婷女人 | 久草久草在线观看 | 日本中出在线观看 | 97视频免费播放 | 日韩激情一二三区 | 五月综合网 | 四虎视频 | 777视频在线观看 | 日韩欧美视频在线观看免费 | 亚洲日本在线一区 | 中文字幕在线观看2018 | 久草在线高清 | 久久精品一区二区三区国产主播 | 国产专区一 | 黄色一级网 | 色小说在线 | 九九视频精品免费 | 久久国产网站 | 五月婷婷激情六月 | 亚洲涩涩涩涩涩涩 | 欧美a√大片 | 91爱爱视频 | 特级aaa毛片 | av+在线播放在线播放 | 一级黄色片网站 | 正在播放国产一区二区 | 日韩久久精品一区二区三区下载 | 亚洲综合色激情五月 | 精品美女久久 | 国产精品美女网站 | 久久精品国亚洲 | 国产成人久久精品亚洲 | 国产中文字幕一区二区 | 国产一区福利 | 久草资源在线观看 | 中文字幕日本在线 | 91中文字幕网 | 国产一区播放 | 亚洲精品乱码白浆高清久久久久久 | 国产美女视频网站 | 日b视频在线观看网址 | 日本久久成人中文字幕电影 | 国产精品成人自拍 | 在线观看黄a| 日韩一级片网址 | 国产精品2018 | 免费久久99精品国产婷婷六月 | 日韩影片在线观看 | 狠狠色噜噜狠狠狠 | 亚洲第一香蕉视频 | 国产一级片不卡 | 日韩av手机在线看 | 国产精品扒开做爽爽的视频 | 免费福利小视频 | 久草久草在线 | 日韩三级.com | 色网址99| 日韩大片免费在线观看 | 精品亚洲一区二区 | 日韩欧美精选 | 中文字幕在线观看国产 | 欧美极品少妇xxxx | 天天射天天干 | 久久久久久久久毛片 | 色婷婷99 | 91视频在线观看大全 | 99精品欧美一区二区蜜桃免费 | 久久婷婷国产 | 干干夜夜| 丁香免费视频 | 日韩高清国产精品 | 日韩在线观看视频一区二区三区 | 免费在线观看成年人视频 | 中文字幕有码在线观看 | 亚洲在线资源 | 亚洲精选国产 | 精品一区二区在线观看 | 91看片在线免费观看 | 少妇bbb好爽 | 在线播放 一区 | av一区二区在线观看中文字幕 | 久久免费精品 | a资源在线| 久久97超碰 | 久久久久国产精品免费网站 | 最新av电影网站 | 色婷婷久久一区二区 | 国产精品丝袜在线 | 亚洲精品国产精品乱码不99热 | 999国产精品视频 | 国产一级性生活视频 | 亚洲视频免费视频 | 久久视频这里只有精品 | 国产精国产精品 | 国产人成免费视频 | 蜜臀av在线一区二区三区 | 1区2区3区在线观看 三级动图 | 国产美女精品视频 | 国产精品久久久久久久久久ktv | 国产精品美女久久久 | 天天拍天天干 | 特级毛片在线观看 | 久久久在线 | 99热精品久久 | 日日日爽爽爽 | 日日夜夜av | 国产黄色片免费 | 天天色天天操天天爽 | 999国产 | 有码视频在线观看 | 在线视频一二三 | 欧美日韩国产伦理 | 99精品免费在线观看 | 成人中文字幕在线观看 | 久草网免费| 亚洲色图av | www五月婷婷| 在线免费视频一区 | a在线观看免费视频 | 这里只有精彩视频 | 天天曰天天曰 | 欧美日韩精品网站 | 国产裸体永久免费视频网站 | 96精品高清视频在线观看软件特色 | 精品影院一区二区久久久 | 日韩区欧美久久久无人区 | 国产一级视屏 | 国产精品午夜在线观看 | 人成午夜视频 | 超碰人人91 | 成人黄色电影在线观看 | 婷婷丁香五 | 国产精品久久久久国产精品日日 | 91精品国产自产在线观看永久 | 狠狠色狠狠色合久久伊人 | 最近中文字幕第一页 | 51久久成人国产精品麻豆 | 国产一级在线看 | 欧美孕交vivoestv另类 | av电影中文字幕在线观看 | 国内精自线一二区永久 | 亚洲精品理论 | 国产精品2区 | 亚洲第一伊人 | 日韩av在线一区二区 | 久久精品视频在线看 | av在线网站观看 | 美女视频黄在线观看 | 亚洲天堂在线观看完整版 | 久久久国产精品人人片99精片欧美一 | 国产精品永久 | 国产偷国产偷亚洲清高 | 亚洲成人精品在线 | 九九久久国产精品 | 麻豆久久精品 | 天天摸日日操 | 91视视频在线直接观看在线看网页在线看 | 国产精品久久久久久久久免费 | 免费看的视频 | 色噜噜在线观看视频 | 久久噜噜少妇网站 | 福利av影院| 91日韩在线视频 | 五月婷婷色综合 | 欧美成天堂网地址 | 国内精品福利视频 | 久久国产精品久久w女人spa |