浅谈 Kubernetes Scheduling-Framework 插件的实现
最近幾個月一直在研究 kubernetes 的 scheduling-framework 調度框架,發現還是十分有意思的,我自己也實現了一個基于 scheduling-framework 調度框架的自定義調度器,希望感興趣的同學一起學習:https://github.com/NJUPT-ISL/Yoda-Scheduler
Scheduling-framework 調度框架
Kubernetes 的 scheduling-framework 調度框架(以下簡稱調度框架)是針對當前 kubernetes 調度器的增強,它不同于之前的 scheduler-extender,用戶可以編寫多個插件,這些插件可以在調度的不同階段作為原有調度器的擴展,并且這些插件會和 kubernetes 原有的調度器源代碼會一起編譯到調度程序中。
調度框架設計目標
增強 kubernetes 原有調度器的可擴展性。
通過將調度程序的某些功能移至插件,可以簡化調度程序核心。
調度框架中可設置多個擴展點。
調度框架通過插件機制來接收插件結果,并根據接收到的結果繼續或中止。
提出一種處理錯誤并將其與插件進行通信的機制。
Proposal
調度框架在 kubernetes 調度器中定義了很多 Go 的接口和 Go API,用于用戶設計插件使用。這些用戶設計的插件將會被添加到調度程序中,并在編譯時包含在內。可以通過配置調度程序的 ComponentConfig 將允許啟用、禁用和重新排序插件。自定義調度程序可以“ 在樹外 ” 編寫其插件并編譯包含其自己的插件的調度程序二進制文件。
調度周期和綁定周期
調度器調度一個 Pod 的過程分為兩個階段:調度周期和綁定周期。
在調度周期中,調度器會為 Pod 選擇一個最合適它運行的節點,然后調度過程將進入綁定周期。
在綁定周期中,調度器會檢測調度周期中選中的那個“最合適的節點”是不是真的可以讓這個 Pod 穩定的運行(比如檢測 PV、檢測是否有端口沖突等),或者需不需要做一些初始化操作(比如設置這個節點上的 FPGA 板子的狀態、設置 GPU 顯卡的驅動版本、CUDA 的版本等)。
擴展點
kubernetes 調度框架在調度周期和綁定周期都為我們提供了豐富的擴展點,這些擴展點可以“插上”我們自己設計的調度插件,一個插件可以在多個擴展點注冊以執行更復雜或有狀態的任務,實現我們想要的調度功能:
下面闡述下各個擴展點可以實現的功能。
Sort 排序
排序擴展點,由于調度器是按照 FIFO 的順序調度 Pod 的,因此當隊列里出現多個等待調度的 Pod 時,可以對這些 Pod 的先后順序進行排序,把我們想要的 Pod(可能優先級比較高)往出隊方向移動,讓它可以更快地被調度。
目前的 Sort 擴展點只能啟用一個,不可以啟用多個 Sort 擴展插件。
我們可以看下 Sort 的接口,代碼位于 kubernetes 項目的 /pkg/scheduler/framework/interface.go 中:
type?QueueSortPlugin?interface?{Plugin//?Less?are?used?to?sort?pods?in?the?scheduling?queue.Less(*PodInfo,?*PodInfo)?bool }也就是只需要實現 Less 方法即可,比如如下的實現:
func?Less(podInfo1,?podInfo2?*framework.PodInfo)?bool?{return?GetPodPriority(podInfo1)?>?GetPodPriority(podInfo2) }Pre-filter 預過濾
該擴展點用于預處理有關 Pod 的信息,或檢查集群或 Pod 必須滿足的某些條件。預過濾器插件應實現 PreFilter 函數,如果 PreFilter 返回錯誤,則調度周期將中止。注意,在每個調度周期中,只會調用一次 PreFilter。
Pre-filter 插件可以選擇實現 PreFilterExtensions 接口,這個接口定義了 AddPod 和 RemovePod 方法以增量方式修改其預處理信息。
type?PreFilterPlugin?interface?{PluginPreFilter(ctx?context.Context,?state?*CycleState,?p?*v1.Pod)?*StatusPreFilterExtensions()?PreFilterExtensions }這里的 CycleState ,表示調度的上下文,其實是一個 map 的封裝,結構體內部通過讀寫鎖實現了并發安全,開發者可以通過 CycleState 來實現多個調度插件直接的數據傳遞,也就是多個插件可以共享狀態或通過此機制進行通信。
?type?CycleState?struct?{mx??????sync.RWMutexstorage?map[StateKey]StateDatarecordFrameworkMetrics?bool//?該值為?true,?則調度框架會記錄此次調度周期的數據}這里的 StateKey 是 string 類型,StateData 是一個接口類型:
?type?StateData?interface?{//?Clone?is?an?interface?to?make?a?copy?of?StateData.?For?performance?reasons,//?clone?should?make?shallow?copies?for?members?(e.g.,?slices?or?maps)?that?are?not//?impacted?by?PreFilter's?optional?AddPod/RemovePod?methods.Clone()?StateData}我們可以做一個簡單的接口實現,來實現 StateData:
?type?Data?struct?{Value?int64}func?(s?*Data)?Clone()?framework.StateData?{c?:=?&Data{Value:?s.Value,}return?c}那么當插件在該擴展點想傳遞數據時就可以使用如下類似的代碼實現數據的傳遞:
?Max?:=?Data{Value:?0}state.Lock()state.Write(framework.StateKey("Max"),?&Max)defer?state.Unlock()Filter 過濾
用于過濾不能滿足當前被調度 Pod 運行需求的節點。對于每個節點,調度程序將按配置的順序調用該類插件。如果有任何過濾器插件將節點標記為不可行,則不會為該節點調用其余插件。可以同時評估節點,并且在同一調度周期中可以多次調用 Filter 插件。這塊其實是調度器會啟動多個 go 協程以實現對多個節點并發調用 filter,來提高過濾效率。過濾插件其實類似于上一代 Kubernetes 調度器中的預選環節,即 Predicates。
我們看下接口定義:
type?FilterPlugin?interface?{PluginFilter(ctx?context.Context,?state?*CycleState,?pod?*v1.Pod,?nodeInfo?*schedulernodeinfo.NodeInfo)?*Status }我們可以對應的實現,比如我這里需要做 GPU 的調度,我需要檢查每個節點的 GPU 是否滿足 Pod 的運行要求:
func?(y?*Yoda)?Filter(ctx?context.Context,?state?*framework.CycleState,?pod?*v1.Pod,?node?*nodeinfo.NodeInfo)?*framework.Status?{klog.V(3).Infof("filter?pod:?%v,?node:?%v",?pod.Name,?node.Node().Name)//?檢查節點?GPU?的健康狀態if?ok,?msg?:=?filter.CheckGPUHealth(node);?ok?{//?節點的?GPU?是否符合Pod?運行等級if?!filter.PodFitsLevel(pod,?node)?{return?framework.NewStatus(framework.Unschedulable,?"Node:"+node.Node().Name+"?GPU?Level?Not?Fit")}//?節點的?GPU?顯存是否符合?Pod?運行if?!filter.PodFitsMemory(pod,?node)?{return?framework.NewStatus(framework.Unschedulable,?"Node:"+node.Node().Name+"?GPU?Memory?Not?Fit")}//?節點的?GPU?數量是否符合?Pod?運行if?!filter.PodFitsNumber(pod,?node)?{return?framework.NewStatus(framework.Unschedulable,?"Node:"+node.Node().Name+"?GPU?Number?Not?Fit")}return?framework.NewStatus(framework.Success,?"")}?else?{return?framework.NewStatus(framework.Unschedulable,?"Node:"+node.Node().Name+msg)} }Pre-Score 預打分 (v1alpha1 版本稱為 Post-Filter)
注意:Pre-Score 從 v1alpha2 開始可用。
該擴展點將使用通過 Filter 階段的節點列表來調用插件。插件可以使用此數據來更新內部狀態或生成日志、指標。比如可以通過該擴展點收集各個節點中性能指標,所有節點中最大的內存的節點,性能最好的 CPU 節點等。
我們繼續來看接口里長什么樣子(我這里是v1alpha1):
type?PostFilterPlugin?interface?{PluginPostFilter(ctx?context.Context,?state?*CycleState,?pod?*v1.Pod,?nodes?[]*v1.Node,?filteredNodesStatuses?NodeToStatusMap)?*Status }針對這個擴展點,通過傳遞的參數可以看出,接口傳入了節點的切片,因此開發者可以通過啟動多個并發協程來獲取數據,并且可以把這些數據存在 CycleState 中,給之后的插件擴展點使用:
func?(y?*Yoda)?PostFilter(ctx?context.Context,?state?*framework.CycleState,?pod?*v1.Pod,?nodes?[]*v1.Node,?filteredNodesStatuses?framework.NodeToStatusMap)?*framework.Status?{klog.V(3).Infof("collect?info?for?scheduling??pod:?%v",?pod.Name)return?collection.ParallelCollection(collection.Workers,?state,?nodes,?filteredNodesStatuses) }并發這塊我們也可以參考 1.13 調度器中經常使用的經典并發模型:
func?ParallelCollection(workers?int,?state?*framework.CycleState,?nodes?[]*v1.Node,?filteredNodesStatuses?framework.NodeToStatusMap)?*framework.Status?{var?(stop?<-chan?struct{}mx???sync.RWMutexmsg??=?"")//?數據存入管道pieces?:=?len(Sum)toProcess?:=?make(chan?string,?pieces)for?_,?v?:=?range?Sum?{toProcess?<-?v}close(toProcess)//?并發協程數限制if?pieces?<?workers?{workers?=?pieces}wg?:=?sync.WaitGroup{}wg.Add(workers)for?i?:=?0;?i?<?workers;?i++?{go?func()?{//?協程消費管道數據for?value?:=?range?toProcess?{select?{case?<-stop:returndefault://?state?并發安全,調用的時候可以不用加鎖if?re?:=?CollectMaxValue(value,?state,?nodes,?filteredNodesStatuses);?!re.IsSuccess()?{klog.V(3).Infof(re.Message())mx.Lock()//?message非并發安全,加鎖msg?+=?re.Message()mx.Unlock()}}}wg.Done()}()}wg.Wait()if?msg?!=?""?{return?framework.NewStatus(framework.Error,?msg)}return?framework.NewStatus(framework.Success,?"") }Score 打分
Score 擴展點和上一代的調度器的優選流程很像,它分為兩個階段:
第一階段稱為 “打分”,用于對已通過過濾階段的節點進行排名。調度程序將為 Score 每個節點調用每個計分插件。
第二階段是 “歸一化”,用于在調度程序計算節點的最終排名之前修改分數,可以不實現, 但是需要保證 Score 插件的輸出必須是 [MinNodeScore,MaxNodeScore]([0-100]) 范圍內的整數 。如果不是,則調度器會報錯,你需要實現 NormalizeScore 來保證最后的得分范圍。如果不實現 NormalizeScore,則 Score 的輸出必須在此范圍內。調度程序將根據配置的插件權重合并所有插件的節點分數。
看看接口的定義:
type?ScorePlugin?interface?{Plugin//?Score?is?called?on?each?filtered?node.?It?must?return?success?and?an?integer//?indicating?the?rank?of?the?node.?All?scoring?plugins?must?return?success?or//?the?pod?will?be?rejected.Score(ctx?context.Context,?state?*CycleState,?p?*v1.Pod,?nodeName?string)?(int64,?*Status)//?ScoreExtensions?returns?a?ScoreExtensions?interface?if?it?implements?one,?or?nil?if?does?not.ScoreExtensions()?ScoreExtensions }我們也可以做如下簡單的實現:
func?(y?*Yoda)?Score(ctx?context.Context,?state?*framework.CycleState,?p?*v1.Pod,?nodeName?string)?(int64,?*framework.Status)?{nodeInfo,?err?:=?y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)if?err?!=?nil?{return?0,?framework.NewStatus(framework.Error,?fmt.Sprintf("getting?node?%q?from?Snapshot:?%v",?nodeName,?err))}s,?err?:=?score.Score(state,?nodeInfo)if?err?!=?nil?{return?0,?framework.NewStatus(framework.Error,?fmt.Sprintf("Score?Node?Error:?%v",?err))}klog.V(3).Infof("node?:?%v?yoda-score:?%v",nodeName,s)return?s,?framework.NewStatus(framework.Success,?"") }如果最后的分數不在范圍內,我們可能需要實現 NormalizeScore 函數做進一步處理:
func?(y?*Yoda)?NormalizeScore(ctx?context.Context,?state?*framework.CycleState,?p?*v1.Pod,?scores?framework.NodeScoreList)?*framework.Status?{var?(highest?int64?=?0)//?歸一化?for?i,?nodeScore?:=?range?scores?{scores[i].Score?=?nodeScore.Score?*?framework.MaxNodeScore?/?highest}return?framework.NewStatus(framework.Success,?"") }Reserve 保留
為給定的 Pod 保留節點上的資源時,維護運行時狀態的插件可以應實現此擴展點,以由調度程序通知。這是在調度程序實際將 Pod 綁定到 Node 之前發生的,它的存在是為了防止在調度程序等待綁定成功時發生爭用情況。
type?ReservePlugin?interface?{Plugin//?Reserve?is?called?by?the?scheduling?framework?when?the?scheduler?cache?is//?updated.Reserve(ctx?context.Context,?state?*CycleState,?p?*v1.Pod,?nodeName?string)?*Status }這里和上面的 Score 類似,函數并沒有提供 nodeInfo 接口,我們可以通過調用 handle.SnapshotSharedLister 來獲取節點的信息。
nodeInfo,?err?:=?y.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)那么以上就是調度周期的插件與實現,其實綁定周期的插件實現和上述的方法也都類似,實現相關的函數即可。
插件注冊
每個插件必須定義一個構造函數,并將其添加到硬編碼的注冊表中。
type?PluginFactory?=?func(runtime.Unknown,?FrameworkHandle)?(Plugin,?error)type?Registry?map[string]PluginFactoryfunc?NewRegistry()?Registry?{return?Registry{fooplugin.Name:?fooplugin.New,barplugin.Name:?barplugin.New,//?New?plugins?are?registered?here.} }那么在編譯的時候,編譯器會將我們的插件和調度源碼一起編譯成我們的自定義調度器。
在聲明插件的時候也需要實現構造函數和對應的方法:
type?Yoda?struct?{args???*Argshandle?framework.FrameworkHandle }func?(y?*Yoda)?Name()?string?{return?Name }func?New(configuration?*runtime.Unknown,?f?framework.FrameworkHandle)?(framework.Plugin,?error)?{args?:=?&Args{}if?err?:=?framework.DecodeInto(configuration,?args);?err?!=?nil?{return?nil,?err}klog.V(3).Infof("get?plugin?config?args:?%+v",?args)return?&Yoda{args:???args,handle:?f,},?nil }編譯小技巧
由于最終的調度器還是以容器的方式運行的,我們可以寫一個 Makefile 來簡化編譯流程:
all:?locallocal:GOOS=linux?GOARCH=amd64?go?build??-o=my-scheduler?./cmd/schedulerbuild:sudo?docker?build?--no-cache?.?-t?registry.cn-hangzhou.aliyuncs.com/my/schedulerpush:sudo?docker?push?registry.cn-hangzhou.aliyuncs.com/my/schedulerformat:sudo?gofmt?-l?-w?. clean:sudo?rm?-f?my-scheduler編寫調度器的Dockerfile:
FROM?debian:stretch-slimWORKDIR?/COPY?my-scheduler?/usr/local/binCMD?["my-scheduler"]那么編譯 -> 構建就可以三步走了:
編譯
構建鏡像
上傳鏡像
自定義調度器的配置
首先需要設置一個 ConfigMap ,用于存放調度器的配置文件:
apiVersion:?v1 kind:?ConfigMap metadata:name:?scheduler-confignamespace:?kube-system data:scheduler-config.yaml:?|apiVersion:?kubescheduler.config.k8s.io/v1alpha1kind:?KubeSchedulerConfigurationschedulerName:?yoda-schedulerleaderElection:leaderElect:?truelockObjectName:?yoda-schedulerlockObjectNamespace:?kube-systemplugins:queueSort:enabled:-?name:?"yoda"filter:enabled:-?name:?"yoda"score:enabled:-?name:?"yoda"postFilter:enabled:-?name:?"yoda"pluginConfig:-?name:?"yoda"args:?{"master":?"master",?"kubeconfig":?"kubeconfig"}這里主要需要修改的就是 schedulerName 字段的調度器名稱和 plugins 字段中各個擴展點的插件名稱,enable 才能保證該擴展點運行了你的插件。
接著為調度器創建 RBAC:
kind:?ClusterRole apiVersion:?rbac.authorization.k8s.io/v1 metadata:name:?yoda-cr rules:-?apiGroups:-?""resources:-?endpoints-?eventsverbs:-?create-?get-?update-?apiGroups:-?""resourceNames:-?yoda-schedulerresources:-?endpointsverbs:-?delete-?get-?patch-?update-?apiGroups:-?""resources:-?nodesverbs:-?get-?list-?watch-?apiGroups:-?""resources:-?podsverbs:-?delete-?get-?list-?watch-?update-?apiGroups:-?""resources:-?bindings-?pods/bindingverbs:-?create-?apiGroups:-?""resources:-?pods/statusverbs:-?patch-?update-?apiGroups:-?""resources:-?replicationcontrollers-?servicesverbs:-?get-?list-?watch-?apiGroups:-?apps-?extensionsresources:-?replicasetsverbs:-?get-?list-?watch-?apiGroups:-?appsresources:-?statefulsetsverbs:-?get-?list-?watch-?apiGroups:-?policyresources:-?poddisruptionbudgetsverbs:-?get-?list-?watch-?apiGroups:-?""resources:-?persistentvolumeclaims-?persistentvolumesverbs:-?get-?list-?watch-?apiGroups:-?""resources:-?configmapsverbs:-?get-?list-?watch-?apiGroups:-?"storage.k8s.io"resources:-?storageclasses-?csinodesverbs:-?watch-?list-?get-?apiGroups:-?"coordination.k8s.io"resources:-?leasesverbs:-?create-?get-?list-?update-?apiGroups:-?"events.k8s.io"resources:-?eventsverbs:-?create-?patch-?update --- apiVersion:?v1 kind:?ServiceAccount metadata:name:?yoda-sanamespace:?kube-system --- kind:?ClusterRoleBinding apiVersion:?rbac.authorization.k8s.io/v1 metadata:name:?yoda-crbnamespace:?kube-system roleRef:apiGroup:?rbac.authorization.k8s.iokind:?ClusterRolename:?yoda-cr subjects:-?kind:?ServiceAccountname:?yoda-sanamespace:?kube-system最后配置調度器的 Deployment:
apiVersion:?apps/v1 kind:?Deployment metadata:name:?yoda-schedulernamespace:?kube-systemlabels:component:?yoda-scheduler spec:replicas:?1selector:matchLabels:component:?yoda-schedulertemplate:metadata:labels:component:?yoda-schedulerspec:serviceAccount:?yoda-sapriorityClassName:?system-cluster-criticalvolumes:-?name:?scheduler-configconfigMap:name:?scheduler-configcontainers:-?name:?yoda-schedulerimage:?registry.cn-hangzhou.aliyuncs.com/geekcloud/yoda-schedulerimagePullPolicy:?Alwaysargs:-?yoda-scheduler-?--config=/scheduler/scheduler-config.yaml-?--v=3resources:requests:cpu:?"50m"volumeMounts:-?name:?scheduler-configmountPath:?/scheduler隨著云計算技術的不斷發展,kubernetes scheduler 也在根據各種復雜的需求不斷進化,未來也會涌現更多各種各樣的豐富的、支持不同功能的調度器在不同的生產環境中發揮著更多強勁的作用,一起期待吧!
作者介紹
李俊江
kubernetes & istio member
南京郵電大學物聯網學院研究生,熱衷于 Kubernetes 與云原生相關技術。
微信:FUNKY-STARS 歡迎交流!
參考
Scheduling Framework
enhancements/624
scheduler-framework-sample
kubernetes 1.13 源碼分析
致謝
感謝 Scheduler-SIG Leader HuangWei 大佬在 kubecon 2018 的 Q&A 和指導!
感謝張磊、車漾大佬在 kubecon 2018 的分享和討論!
直播活動
ServiceMesher 社區聯合 MOSN 社區推出的《云原生網絡代理 MOSN 多協議機解析》直播,教你如何在 MOSN 中接入新的協議,實現不同 RPC 協議的代理,以方便 Service Mesh 擴展。查看詳情:https://mosn.io/zh/blog/news/mosn-channel-1/
點擊?閱讀原文?查看更多
總結
以上是生活随笔為你收集整理的浅谈 Kubernetes Scheduling-Framework 插件的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用.Net Core编写命令行工具(C
- 下一篇: Angular SPA基于Ocelot