func main() {flag.Parse()// set up signals so we handle the first shutdown signal gracefullystopCh := signals.SetupSignalHandler()//讀取 kubeconfig 配置,cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)if err != nil {klog.Fatalf("Error building kubeconfig: %s", err.Error())}//監聽普通事件kubeClient, err := kubernetes.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building kubernetes clientset: %s", err.Error())}//一個監聽 Foo 事件exampleClient, err := clientset.NewForConfig(cfg)if err != nil {klog.Fatalf("Error building example clientset: %s", err.Error())}kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)//基于 Client(kubeClient, exampleClient)、Informer 初始化自定義 Controller,//監聽 Deployment 以及 Foos 資源變化controller := NewController(kubeClient, exampleClient,kubeInformerFactory.Apps().V1().Deployments(),exampleInformerFactory.Samplecontroller().V1alpha1().Foos())// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)// Start method is non-blocking and runs all registered informers in a dedicated goroutine.kubeInformerFactory.Start(stopCh)exampleInformerFactory.Start(stopCh)//開啟 Controllerif err = controller.Run(2, stopCh); err != nil {klog.Fatalf("Error running controller: %s", err.Error())}
}
復制代碼
3.2 Controller處理事件邏輯
Controller 的關鍵成員即兩個事件的 Listener(appslisters.DeploymentLister、listers.FooLister)這兩個成員將由 main 函數傳入參數進行初始化。
// Controller is the controller implementation for Foo resources
type Controller struct {kubeclientset kubernetes.Interfacesampleclientset clientset.InterfacedeploymentsLister appslisters.DeploymentListerdeploymentsSynced cache.InformerSyncedfoosLister listers.FooListerfoosSynced cache.InformerSynced// workqueue is a rate limited work queue. This is used to queue work to be// processed instead of performing it as soon as a change happens. This// means we can ensure we only process a fixed amount of resources at a// time, and makes it easy to ensure we are never processing the same item// simultaneously in two different workers.workqueue workqueue.RateLimitingInterface// recorder is an event recorder for recording Event resources to the// Kubernetes API.recorder record.EventRecorder
}//Controller.go
controller := &Controller{kubeclientset: kubeclientset,sampleclientset: sampleclientset,deploymentsLister: deploymentInformer.Lister(),deploymentsSynced: deploymentInformer.Informer().HasSynced,foosLister: fooInformer.Lister(),foosSynced: fooInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),recorder: recorder,
}//main.go初始化入參
func NewController(kubeclientset kubernetes.Interface,sampleclientset clientset.Interface,deploymentInformer appsinformers.DeploymentInformer,fooInformer informers.FooInformer) *Controller
復制代碼
// Set up an event handler for when Foo resources changefooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueFoo,UpdateFunc: func(old, new interface{}) {controller.enqueueFoo(new)},})// Set up an event handler for when Deployment resources change. This// handler will lookup the owner of the given Deployment, and if it is// owned by a Foo resource will enqueue that Foo resource for// processing. This way, we don't need to implement custom logic for// handling Deployment resources. More info on this pattern:deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.handleObject,UpdateFunc: func(old, new interface{}) {newDepl := new.(*appsv1.Deployment)oldDepl := old.(*appsv1.Deployment)if newDepl.ResourceVersion == oldDepl.ResourceVersion {// Periodic resync will send update events for all known Deployments.// Two different versions of the same Deployment will always have different RVs.return}controller.handleObject(new)},DeleteFunc: controller.handleObject,})
復制代碼
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than Foo.
func (c *Controller) enqueueFoo(obj interface{}) {var key stringvar err errorif key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {utilruntime.HandleError(err)return}c.workqueue.AddRateLimited(key)
}// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that Foo resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
func (c *Controller) handleObject(obj interface{}) {var object metav1.Objectvar ok boolif object, ok = obj.(metav1.Object); !ok {tombstone, ok := obj.(cache.DeletedFinalStateUnknown)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))return}object, ok = tombstone.Obj.(metav1.Object)if !ok {utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))return}klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())}klog.V(4).Infof("Processing object: %s", object.GetName())if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {// If this object is not owned by a Foo, we should not do anything more// with it.if ownerRef.Kind != "Foo" {return}foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)if err != nil {klog.V(4).Infof("ignoring orphaned object '%s' of foo '%s'", object.GetSelfLink(), ownerRef.Name)return}c.enqueueFoo(foo)return}
}
復制代碼
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and waitfor
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {defer utilruntime.HandleCrash()defer c.workqueue.ShutDown()// Start the informer factories to begin populating the informer cachesklog.Info("Starting Foo controller")// Wait for the caches to be synced before starting workersklog.Info("Waiting for informer caches to sync")if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}klog.Info("Starting workers")// Launch two workers to process Foo resourcesfor i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}klog.Info("Started workers")<-stopChklog.Info("Shutting down workers")return nil
}// runWorker is a long-running function that will continually call the
// processNextWorkItem functionin order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {for c.processNextWorkItem() {}
}// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {obj, shutdown := c.workqueue.Get()if shutdown {returnfalse}// We wrap this block in a func so we can defer c.workqueue.Done.err := func(obj interface{}) error {// We call Done here so the workqueue knows we have finished// processing this item. We also must remember to call Forget if we// do not want this work item being re-queued. For example, we do// not call Forget if a transient error occurs, instead the item is// put back on the workqueue and attempted again after a back-off// period.defer c.workqueue.Done(obj)var key stringvar ok bool// We expect strings to come off the workqueue. These are of the// form namespace/name. We do this as the delayed nature of the// workqueue means the items in the informer cache may actually be// more up to date that when the item was initially put onto the// workqueue.if key, ok = obj.(string); !ok {// As the item in the workqueue is actually invalid, we call// Forget here else we'd go into a loop of attempting to// process a work item that is invalid.c.workqueue.Forget(obj)utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))return nil}// Run the syncHandler, passing it the namespace/name string of the// Foo resource to be synced.if err := c.syncHandler(key); err != nil {// Put the item back on the workqueue to handle any transient errors.c.workqueue.AddRateLimited(key)return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())}// Finally, if no error occurs we Forget this item so it does not// get queued again until another change happens.c.workqueue.Forget(obj)klog.Infof("Successfully synced '%s'", key)return nil}(obj)if err != nil {utilruntime.HandleError(err)return true}return true
}
復制代碼
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Foo resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {// Convert the namespace/name string into a distinct namespace and namenamespace, name, err := cache.SplitMetaNamespaceKey(key)if err != nil {utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))return nil}// Get the Foo resource with this namespace/namefoo, err := c.foosLister.Foos(namespace).Get(name)if err != nil {// The Foo resource may no longer exist, inwhichcase we stop// processing.if errors.IsNotFound(err) {utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))return nil}return err}deploymentName := foo.Spec.DeploymentNameif deploymentName == "" {// We choose to absorb the error here as the worker would requeue the// resource otherwise. Instead, the next time the resource is updated// the resource will be queued again.utilruntime.HandleError(fmt.Errorf("%s: deployment name must be specified", key))return nil}// Get the deployment with the name specified in Foo.specdeployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)// If the resource doesn't exist, we'll create itif errors.IsNotFound(err) {deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))}// If an error occurs during Get/Create, we'll requeue the item so we can// attempt processing again later. This could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// If the Deployment is not controlled by this Foo resource, we should log// a warning to the event recorder and retif !metav1.IsControlledBy(deployment, foo) {msg := fmt.Sprintf(MessageResourceExists, deployment.Name)c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)return fmt.Errorf(msg)}// If this number of the replicas on the Foo resource is specified, and the// number does not equal the current desired replicas on the Deployment, we// should update the Deployment resource.if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {klog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))}// If an error occurs during Update, we'll requeue the item so we can// attempt processing again later. THis could have been caused by a// temporary network failure, or any other transient reason.if err != nil {return err}// Finally, we update the status block of the Foo resource to reflect the// current state of the worlderr = c.updateFooStatus(foo, deployment)if err != nil {return err}c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)return nil
}
復制代碼
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"}// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {return SchemeGroupVersion.WithKind(kind).GroupKind()
}// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {return SchemeGroupVersion.WithResource(resource).GroupResource()
}var (SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)AddToScheme = SchemeBuilder.AddToScheme
)// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {scheme.AddKnownTypes(SchemeGroupVersion,&Foo{},&FooList{},)metav1.AddToGroupVersion(scheme, SchemeGroupVersion)return nil
}
復制代碼