日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kubernetes CRD开发模式及源码实现深入剖析-Kubernetes商业环境实战

發布時間:2023/12/2 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kubernetes CRD开发模式及源码实现深入剖析-Kubernetes商业环境实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

專注于大數據及容器云核心技術解密,可提供全棧的大數據+云原生平臺咨詢方案,請持續關注本套博客。如有任何學術交流,可隨時聯系。留言請關注《數據云技術社區》公眾號。

1 CRD資源擴展

  • CRD 即CustomResourceDefinition,是 kubernetes 極力推薦的資源擴展方式。
  • 基于 CRD 技術,用戶能將自定義資源注冊到 kubernetes 系統,并像使用原生資源(如 pod、statefulset )一樣對自定義資源進行創建、查看、修改、刪除等操作,實現了類似于插件式的功能增強。

2 CRD開發步驟

1.根據go.mod設定,建立目錄$GOPATH/src/k8s.io module k8s.io/sample-controller 2.設置代理export GOPROXY=https://goproxy.io 3.加載依賴包,自動歸檔到vendor目錄go mod vendor -v 4.設置update-codegen.sh"${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" \k8s.io/sample-controller/pkg/client k8s.io/sample-controller/pkg/apis \samplecontroller:v1alpha1 \--output-base "$(dirname "${BASH_SOURCE[0]}")/../../.." \--go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt5.自定義控制器及main.go6.編譯sample-controller生成可執行程序sample-controllergo build -o sample-controller .7.運行sample-controller./sample-controller -kubeconfig=$HOME/.kube/congig8.注冊 CRD 資源 crd.yamlapiVersion: apiextensions.k8s.io/v1beta1kind: CustomResourceDefinitionmetadata:name: foos.samplecontroller.k8s.iospec:group: samplecontroller.k8s.ioversion: v1alpha1names:kind: Fooplural: foosscope: Namespacedkubectl create -f artifacts/examples/crd.yaml 9.創建 CRD 資源 example-foo.yamlapiVersion: samplecontroller.k8s.io/v1alpha1kind: Foometadata:name: example-foospec:deploymentName: example-fooreplicas: 1kubectl create -f artifacts/examples/example-foo.yaml10.查看 Foo 的部署情況kubectl get deployments 復制代碼

3 CRD項目自定義控制器開發(以sample-controller為例)

3.1 main.go開發

  • 讀取 kubeconfig 配置
  • 初始化kubeClient,監聽普通事件
  • 初始化exampleClient, 監聽Foo 事件
  • 初始化kubeInformerFactory和exampleInformerFactory
  • 初始化自定義 Controller
  • 開啟 Controller
func main() {flag.Parse()// set up signals so we handle the first shutdown signal gracefullystopCh := signals.SetupSignalHandler()//讀取 kubeconfig 配置,cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)if err != nil {klog.Fatalf("Error building kubeconfig: %s", err.Error())}//監聽普通事件kubeClient, err := kubernetes.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())}//一個監聽 Foo 事件exampleClient, err := clientset.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building example clientset: %s", err.Error())}kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)//基于 Client(kubeClient, exampleClient)、Informer 初始化自定義 Controller,//監聽 Deployment 以及 Foos 資源變化controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)// Start method is non-blocking and runs all registered informers in a dedicated goroutine.kubeInformerFactory.Start(stopCh)exampleInformerFactory.Start(stopCh)//開啟 Controllerif err = controller.Run(2, stopCh); err != nil {klog.Fatalf("Error running controller: %s", err.Error())} } 復制代碼

3.2 Controller處理事件邏輯

  • Controller 的關鍵成員即兩個事件的 Listener(appslisters.DeploymentLister、listers.FooLister)這兩個成員將由 main 函數傳入參數進行初始化。
  • 為了緩沖事件處理,這里使用隊列暫存事件,相關成員即為 workqueue.RateLimitingInterface record.EventRecorder 用于記錄事件。
// Controller is the controller implementation for Foo resources type Controller struct {kubeclientset kubernetes.Interfacesampleclientset clientset.InterfacedeploymentsLister appslisters.DeploymentListerdeploymentsSynced cache.InformerSyncedfoosLister listers.FooListerfoosSynced cache.InformerSynced// workqueue is a rate limited work queue. This is used to queue work to be// processed instead of performing it as soon as a change happens. This// means we can ensure we only process a fixed amount of resources at a// time, and makes it easy to ensure we are never processing the same item// simultaneously in two different workers.workqueue workqueue.RateLimitingInterface// recorder is an event recorder for recording Event resources to the// Kubernetes API.recorder record.EventRecorder }//Controller.go controller := &Controller{kubeclientset: kubeclientset,sampleclientset: sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister: fooInformer.Lister(),foosSynced: fooInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder: recorder, }//main.go初始化入參 func NewController(kubeclientset kubernetes.Interface,sampleclientset clientset.Interface,deploymentInformer appsinformers.DeploymentInformer,fooInformer informers.FooInformer) *Controller 復制代碼

3.3 自定義Controller事件處理函數

  • 設置對 Foo 資源變化的事件處理函數(Add、Update 均通過 enqueueFoo 處理)
  • 設置對 Deployment 資源變化的事件處理函數(Add、Update、Delete 均通過 handleObject 處理)
  • 引出enqueueFoo 以及 handleObject 的實現
// Set up an event handler for when Foo resources changefooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueFoo,UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},})// Set up an event handler for when Deployment resources change. This// handler will lookup the owner of the given Deployment, and if it is// owned by a Foo resource will enqueue that Foo resource for// processing. This way, we don't need to implement custom logic for// handling Deployment resources. More info on this pattern:deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,}) 復制代碼

3.4 enqueueFoo 以及 handleObject 實現

  • enqueueFoo 就是解析 Foo 資源為namespace/name 形式的字符串,然后入隊
  • handleObject 監聽了所有實現了 metav1 的資源,但只過濾出 owner 是 Foo 的,將其解析為 namespace/name 入隊。不是 Foo不做處理。
// enqueueFoo takes a Foo resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than Foo. func (c *Controller) enqueueFoo(obj interface{}) {var key stringvar err errorif key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)return}c.workqueue.AddRateLimited(key) }// handleObject will take any resource implementing metav1.Object and attempt // to find the Foo resource that 'owns' it. It does this by looking at the // objects metadata.ownerReferences field for an appropriate OwnerReference. // It then enqueues that Foo resource to be processed. If the object does not // have an appropriate OwnerReference, it will simply be skipped. func (c *Controller) handleObject(obj interface{}) {var object metav1.Objectvar ok boolif object, ok = obj.(metav1.Object); !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))return}object, ok = tombstone.Obj.(metav1.Object)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))return}klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())}klog.V(4).Infof("Processing object: %s", object.GetName())if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {// If this object is not owned by a Foo, we should not do anything more// with it.if ownerRef.Kind != "Foo" {return}foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)if err != nil {klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)return}c.enqueueFoo(foo)return} } 復制代碼

3.4 Controler核心run處理函數

  • 在main.go中同步,等待 Informer 同步完成,并發執行runWorker,處理隊列內事件。
  • runWorker引出processNextWorkItem,processNextWorkItem引出syncHandler。
// Run will set up the event handlers for types we are interested in, as well // as syncing informer caches and starting workers. It will block until stopCh // is closed, at which point it will shutdown the workqueue and wait for // workers to finish processing their current work items. func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {defer utilruntime.HandleCrash()defer c.workqueue.ShutDown()// Start the informer factories to begin populating the informer cachesklog.Info("Starting Foo controller")// Wait for the caches to be synced before starting workersklog.Info("Waiting for informer caches to sync")if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}klog.Info("Starting workers")// Launch two workers to process Foo resourcesfor i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}klog.Info("Started workers")<-stopChklog.Info("Shutting down workers")return nil }// runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. func (c *Controller) runWorker() {for c.processNextWorkItem() {} }// processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool {obj, shutdown := c.workqueue.Get()if shutdown {return false}// We wrap this block in a func so we can defer c.workqueue.Done.err := func(obj interface{}) error {// We call Done here so the workqueue knows we have finished// processing this item. We also must remember to call Forget if we// do not want this work item being re-queued. For example, we do// not call Forget if a transient error occurs, instead the item is// put back on the workqueue and attempted again after a back-off// period.defer c.workqueue.Done(obj)var key stringvar ok bool// We expect strings to come off the workqueue. These are of the// form namespace/name. We do this as the delayed nature of the// workqueue means the items in the informer cache may actually be// more up to date that when the item was initially put onto the// workqueue.if key, ok = obj.(string); !ok {// As the item in the workqueue is actually invalid, we call// Forget here else we'd go into a loop of attempting to// process a work item that is invalid.c.workqueue.Forget(obj)utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))return nil}// Run the syncHandler, passing it the namespace/name string of the// Foo resource to be synced.if err := c.syncHandler(key); err != nil {// Put the item back on the workqueue to handle any transient errors.c.workqueue.AddRateLimited(key)return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())}// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.workqueue.Forget(obj)klog.Infof("Successfully synced '%s'", key)return nil}(obj)if err != nil {utilruntime.HandleError(err)return true}return true } 復制代碼

3.5 Controler核心邏輯syncHandler自定義處理函數

  • syncHandler 的處理邏輯大體如下:
  • 根據 namespace/name 獲取 foo 資源
  • 根據 foo,獲取其 Deployment 名稱,進而獲取 deployment 資源(沒有就為其創建)
  • 根據?foo 的 Replicas 更新?deployment 的 Replicas(如果不匹配)
  • 更新 foo 資源的狀態為最新 deployment 的狀態(其實就是 AvailableReplicas)
  • 由此,可知 foo 的實現實體其實就是 deployment
// syncHandler compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Foo resource // with the current status of the resource. func (c *Controller) syncHandler(key string) error {// Convert the namespace/name string into a distinct namespace and namenamespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))return nil}// Get the Foo resource with this namespace/namefoo, err := c.foosLister.Foos(namespace).Get(name)if err != nil {// The Foo resource may no longer exist, in which case we stop// processing.if errors.IsNotFound(err) {utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))return nil}return err}deploymentName := foo.Spec.DeploymentNameif deploymentName == "" {// We choose to absorb the error here as the worker would requeue the// resource otherwise. Instead, the next time the resource is updated// the resource will be queued again.utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))return nil}// Get the deployment with the name specified in Foo.specdeployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)// If the resource doesn't exist, we'll create itif errors.IsNotFound(err) {deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))}// If an error occurs during Get/Create, we'll requeue the item so we can// attempt processing again later. This could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// If the Deployment is not controlled by this Foo resource, we should log// a warning to the event recorder and retif !metav1.IsControlledBy(deployment, foo) {msg := fmt.Sprintf(MessageResourceExists, deployment.Name)c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)return fmt.Errorf(msg)}// If this number of the replicas on the Foo resource is specified, and the// number does not equal the current desired replicas on the Deployment, we// should update the Deployment resource.if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))}// If an error occurs during Update, we'll requeue the item so we can// attempt processing again later. THis could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// Finally, we update the status block of the Foo resource to reflect the// current state of the worlderr = c.updateFooStatus(foo, deployment)if err != nil {return err}c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)return nil } 復制代碼

4 Controler核心邏輯注冊

  • pkg/samplecontroller/v1alpha1/register.go(處理類型 Schema)
// SchemeGroupVersion is group version used to register these objects var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}// Kind takes an unqualified kind and returns back a Group qualified GroupKind func Kind(kind string) schema.GroupKind {return SchemeGroupVersion.WithKind(kind).GroupKind() }// Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource {return SchemeGroupVersion.WithResource(resource).GroupResource() }var (SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)AddToScheme = SchemeBuilder.AddToScheme )// Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error {scheme.AddKnownTypes(SchemeGroupVersion,&Foo{},&FooList{},)metav1.AddToGroupVersion(scheme, SchemeGroupVersion)return nil } 復制代碼

5 總結

整個開發遇到各種問題,比如:包結構和相對路徑,GO代理,以及K8s CRD 官方提供函數等,通過反復的實踐,終于掌握整套Kubernetes CRD開發模式,以此進行留記,方便查閱。

專注于大數據及容器云核心技術解密,可提供全棧的大數據+云原生平臺咨詢方案,請持續關注本套博客。如有任何學術交流,可隨時聯系。留言請關注《數據云技術社區》公眾號。

轉載于:https://juejin.im/post/5d57f750f265da03b94ff11a

總結

以上是生活随笔為你收集整理的Kubernetes CRD开发模式及源码实现深入剖析-Kubernetes商业环境实战的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。