日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

Volcano 原理、源码分析(二)

發布時間:2024/1/3 windows 54 coder
生活随笔 收集整理的這篇文章主要介紹了 Volcano 原理、源码分析(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • 0. 總結前置
  • 1. 概述
  • 2. 尋找調度器中的 PodGroup
    • 2.1 從 PodGroup 到 JobInfo 的封裝
    • 2.2 從 Pod 到 TaskInfo 的封裝
  • 3. 控制器中 PodGroup 和 Pod 的創建邏輯
    • 3.1 從 main 開始尋找 SyncJob 的蹤跡
    • 3.2 SyncJob 過程如何創建 PodGroup 和 Pod
      • 3.2.1 創建 PodGroup
      • 3.2.2 創建 Pods
  • 4. 總結
  • 5. 最后

0. 總結前置

你也可以選擇直接跳到1. 概述開始閱讀。

今天我們先順著 Volcano Scheduler 部分的代碼找到了 PodGroup 的處理邏輯,看到了 Scheduler 拿到 PodGroup 后會組裝 JobInfo 對象;拿到 Pod 后會組裝 TaskInfo 對象(這里根據 Pod 的注解中指定的 PodGroup 名字來將 TaskInfo 和 JobInfo 關聯,也就是 Pod 和 PodGroup 的關聯。

接著我們又從 Volcano Controller(Job Controller)中找到了 PodGroupPod 的創建邏輯。在 Job 對象創建后,控制器會根據 Job 的信息創建一個唯一對應的 PodGroup,然后根據 Job 中的 Tasks 信息創建一系列的 pods,這些 pods 會帶上 PodGroup 名字(在注解里)

至此,我們知道了 Volcano 中調度器和控制器的職責分層,進一步也就能夠理解 Volcano 如何和 kubeflow 等其他框架結合完成復雜任務的批調度過程了。(上層框架創建 PodGroup 和 Pods,Volcano 根據 PodGroup 信息和 Pods 注解信息完成批調度過程。

1. 概述

話接上回,在《Volcano 原理、源碼分析(一)》中我們聊到了 Volcano Scheduler 部分的主要工作邏輯,發現 Volcano Scheduler 是圍繞了 Job 和 Job 里面的 Tasks 在調度。但是理論上 Volcano Scheduler 應該以 PodGroup 為基礎單元執行調度邏輯,這里的 gap 出現在哪里呢?

后來在文末我提到了 Scheduler 部分的 Job 就是 PodGroup wrapper,Task 就是 Pod wrapper,這樣邏輯上才說得通。今天我準備接著這條路,分析 PodGroup 的“調諧”邏輯

2. 尋找調度器中的 PodGroup

根據經驗(其實我也沒有啥 Volcano 的經驗,不過早幾年看過不少 K8s 里的控制器和調度器相關代碼,Volcano 既然在 K8s 體系內抽象控制器和調度器,那實現邏輯就應該類似),Volcano 的控制器部分應該負責根據 Job 資源配置來創建相應的 PodGroup 資源對象實例,然后調度器部分應該通過相應的 Informer 能力拿到 PodGroup 資源對象實例創建事件,接著進行相應的調度邏輯。(盲猜的,接著從代碼里順著這個思路看能不能找到相應邏輯。)

2.1 從 PodGroup 到 JobInfo 的封裝

pkg/scheduler 目錄內搜 PodGroup 相關代碼,一個 PodGroup 相關的 EventHandler 邏輯出現在我眼前。K8s 自定義控制器開發的主要工作之一就是定義 “Resource Event Handlers”

  • pkg/scheduler/cache/cache.go:669
sc.podGroupInformerV1beta1.Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
        FilterFunc: func(obj interface{}) bool {
            var pg *vcv1beta1.PodGroup
            switch v := obj.(type) {
            case *vcv1beta1.PodGroup:
                pg = v
            // ......

            return responsibleForPodGroup(pg, mySchedulerPodName, c)
        },
        Handler: cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPodGroupV1beta1,
            UpdateFunc: sc.UpdatePodGroupV1beta1,
            DeleteFunc: sc.DeletePodGroupV1beta1,
        },
    })

順著這里的代碼接著看 AddPodGroupV1beta1 方法的實現:

  • pkg/scheduler/cache/event_handlers.go:707
func (sc *SchedulerCache) AddPodGroupV1beta1(obj interface{}) {
	ss, ok := obj.(*schedulingv1beta1.PodGroup)
	// ......

	podgroup := scheduling.PodGroup{}
	if err := scheme.Scheme.Convert(ss, &podgroup, nil); err != nil {
		klog.Errorf("Failed to convert podgroup from %T to %T", ss, podgroup)
		return
	}

	pg := &schedulingapi.PodGroup{PodGroup: podgroup, Version: schedulingapi.PodGroupVersionV1Beta1}
	// ......

	if err := sc.setPodGroup(pg); err != nil {
		klog.Errorf("Failed to add PodGroup %s into cache: %v", ss.Name, err)
		return
	}
}

這里定義了一個 PodGroup 類型(不是 CRD 里的 PodGroup):

type PodGroup struct {
	scheduling.PodGroup
	Version string
}

然后將這個包含 CR PodGroup + Version 的 pg 傳給了 sc.setPodGroup(pg) 方法。sc 的類型是 *SchedulerCache

接著來看 setPodGroup 方法的實現:

  • pkg/scheduler/cache/event_handlers.go:668
func (sc *SchedulerCache) setPodGroup(ss *schedulingapi.PodGroup) error {
    // 這里的 job 是一個字符串,內容是 PodGroup 的 namespace/name
	job := getJobID(ss)
	if _, found := sc.Jobs[job]; !found {
        // Jobs 這個 map 的 key 就是 pg 的 namespace/name,value 是一個新的類型 *JobInfo
		sc.Jobs[job] = schedulingapi.NewJobInfo(job)
	}

    // 這里存的 *JobInfo 類型的 Job 中很多屬性都來自于 ss 這個 pg
	sc.Jobs[job].SetPodGroup(ss)

	// ......
	return nil
}

到這里,PodGroup 的信息就被轉存到了 JobInfo 中,JobInfo 也就對應一個 PodGroup 在 Scheduler 內的 wrapper。

2.2 從 Pod 到 TaskInfo 的封裝

PodGroup 這個 CR 中其實不包含 Pod 的具體信息。在 PodGroup 的 Spec 定義中,我們可以看到如下字段:

  • volcano.sh/apis@v1.8.0/pkg/apis/scheduling/types.go:166
type PodGroupSpec struct {
	MinMember int32
	MinTaskMember map[string]int32
	Queue string
	PriorityClassName string
	MinResources *v1.ResourceList
}

換言之,通過 PodGroup 資源對象實例其實找不到相應的 pods 信息,也就是說 spec 里沒有類似 Pods 這樣的字段。那么 Pod 和 PodGroup 如何關聯呢?既然沒有直接綁定,那么 Pod 中就一定會保存 PodGroup 的信息,比如通過在 Pod 的 annotation 中保存所屬 PodGroup 的 id 之類的方式,然后在處理 Pod 變更事件對應的控制邏輯中完成 Pod 和 PodGroup 的關聯過程。

好,下一步理所當然看一下當 Pod 相關的 events 產生的時候,Scheduler 里相應的 handlers 是什么。

  • pkg/scheduler/cache/cache.go:616
sc.podInformer.Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
        FilterFunc: func(obj interface{}) bool {
            switch v := obj.(type) {
            case *v1.Pod:
                if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) {
                    if len(v.Spec.NodeName) == 0 {
                        return false
                    }
                    if !responsibleForNode(v.Spec.NodeName, mySchedulerPodName, c) {
                        return false
                    }
                }
                return true
            // ......
        },
        Handler: cache.ResourceEventHandlerFuncs{
            AddFunc:    sc.AddPod,
            UpdateFunc: sc.UpdatePod,
            DeleteFunc: sc.DeletePod,
        },
    })

在 Filter 過程中,主要是根據 pod.Spec.SchedulerName 來判斷這個 Pod 是不是應該被當前調度器調度。順著繼續看 AddPod 方法的實現:

  • pkg/scheduler/cache/event_handlers.go:362
func (sc *SchedulerCache) AddPod(obj interface{}) {
	pod, ok := obj.(*v1.Pod)
	// ......

	err := sc.addPod(pod)
	// ......
}

再看 addPod 方法:

  • pkg/scheduler/cache/event_handlers.go:237
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
	pi, err := sc.NewTaskInfo(pod)
	// ......

	return sc.addTask(pi)
}

這里干了2件事:

  1. 先拿著 pod 信息創建一個 *TaskInfo 類型的 pi,這里的 TaskInfo 也就是一個 Pod 信息的 wrapper,和前面的 JobInfo 封裝 PodGroup 邏輯非常接近。
  2. SchedulerCache 的 addTask 方法將 TaskInfo 加到 JobInfo.Tasks 屬性中。這里的 Tasks 類型是 map[TaskID]*TaskInfo

另外還需要關注 NewTaskInfo 方法里的一個細節:

  • pkg/scheduler/cache/event_handlers.go:226
func (sc *SchedulerCache) NewTaskInfo(pod *v1.Pod) (*schedulingapi.TaskInfo, error) {
	taskInfo := schedulingapi.NewTaskInfo(pod)
	// ......
	return taskInfo, nil
}

這里調用了 schedulingapi.NewTaskInfo(pod),繼續往里:

  • pkg/scheduler/api/job_info.go:162
func NewTaskInfo(pod *v1.Pod) *TaskInfo {
	initResReq := GetPodResourceRequest(pod)
	resReq := initResReq
	bestEffort := initResReq.IsEmpty()
	preemptable := GetPodPreemptable(pod)
	revocableZone := GetPodRevocableZone(pod)
	topologyInfo := GetPodTopologyInfo(pod)

	jobID := getJobID(pod)

	ti := &TaskInfo{
		UID:           TaskID(pod.UID),
		Job:           jobID,
		Name:          pod.Name,
		Namespace:     pod.Namespace,
		Priority:      1,
		Pod:           pod,
		Resreq:        resReq,
		InitResreq:    initResReq,
		// ......
	}

	// ......

	return ti
}

注意到這里設置了一個 jobID 到 TaskInfo 里,而這個 getJobID() 方法的實現就很有意思了:

  • pkg/scheduler/api/job_info.go:141
func getJobID(pod *v1.Pod) JobID {
	if gn, found := pod.Annotations[v1beta1.KubeGroupNameAnnotationKey]; found && len(gn) != 0 {
		// Make sure Pod and PodGroup belong to the same namespace.
		jobID := fmt.Sprintf("%s/%s", pod.Namespace, gn)
		return JobID(jobID)
	}

	return ""
}

這里嘗試從 pod 中尋找 key 為 "scheduling.k8s.io/group-name" 的 annotation,假如這個 value 是 pg1,那么 JobID 就是 "pod-namespace/pg1",其實也就是 PodGroup 的標識。于是到這里,表示 Pod 的 TaskInfo 也就關聯上了表示 PodGroup 的 TaskInfo。

3. 控制器中 PodGroup 和 Pod 的創建邏輯

到這里,我們知道了 Scheduler 中是如何處理 PodGroup 和 Pod,將其轉換成 jobs 和 job.tasks 然后進一步執行調度邏輯的。那么控制器層面是如何創建 PodGroup 和 Pod 的呢?

在 Volcano 中有一個自定義資源 Job,按理說這個 Job 類型的資源對象被創建后,相應的 Controller 應該負責完成 Job 對應的 PodGroup 和 pods 的創建,并且打上合適的 annotation。同理其他框架,比如 kubeflow 里的 operator 也應該是類似的邏輯,負責創建 PodGroup 以及 pods(也可能只創建 pods),然后和 Volcano Scheduler 協作完成批調度流程。

總之,接著先看下 Volcano 中的控制器部分是如何倒騰 PodGroup 和 Pod 的。

3.1 從 main 開始尋找 SyncJob 的蹤跡

接著我們從主函數入手,尋找當 Job 被創建后,Controller 對應的 worker 邏輯。

  • cmd/controller-manager/main.go:45
func main() {
	// ......

	if err := app.Run(s); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

這里的主要邏輯就是調用 Run() 方法,在 Run() 方法內有一個 startControllers(config, opt) 調用,startControllers(config, opt) 方法內又有一個 c.Run(ctx.Done()) 調用,這幾層函數基本都是框架性質的邏輯,這里不贅述。

c.Run(ctx.Done()) 方法對應的 JobController 的啟動方法是:

  • pkg/controllers/job/job_controller.go:238
func (cc *jobcontroller) Run(stopCh <-chan struct{}) {
	cc.informerFactory.Start(stopCh)
	cc.vcInformerFactory.Start(stopCh)

	// ......

	go wait.Until(cc.handleCommands, 0, stopCh)
	var i uint32
	for i = 0; i < cc.workers; i++ {
		go func(num uint32) {
			wait.Until(
				func() {
					cc.worker(num)
				},
				time.Second,
				stopCh)
		}(i)
	}

	go cc.cache.Run(stopCh)

	// Re-sync error tasks.
	go wait.Until(cc.processResyncTask, 0, stopCh)

	klog.Infof("JobController is running ...... ")
}

從這里就能看到 worker() 方法的調用入口,繼續往后跟 worker() 方法肯定能找到一個 processNextReq() 方法:

func (cc *jobcontroller) worker(i uint32) {
	klog.Infof("worker %d start ...... ", i)

	for cc.processNextReq(i) {
	}
}

processNextReq() 方法中就開始有“干貨邏輯”了:

  • pkg/controllers/job/job_controller.go:310
func (cc *jobcontroller) processNextReq(count uint32) bool {
	queue := cc.queueList[count]
	obj, shutdown := queue.Get()
	// ......

	jobInfo, err := cc.cache.Get(key)
	// ......

	st := state.NewState(jobInfo)
	// ......

	if err := st.Execute(action); err != nil {
		// ......
	}

	queue.Forget(req)

	return true
}

這里的代碼主要邏輯就上面這幾行,我們來關注 st.Execute(action) 的邏輯。

首先 state.NewState(jobInfo) 調用返回了一個 st,這個 st 是什么呢?

  • pkg/controllers/job/state/factory.go:62
func NewState(jobInfo *apis.JobInfo) State {
	job := jobInfo.Job
	switch job.Status.State.Phase {
	case vcbatch.Pending:
		return &pendingState{job: jobInfo}
	case vcbatch.Running:
		return &runningState{job: jobInfo}
	case vcbatch.Restarting:
		return &restartingState{job: jobInfo}
	case vcbatch.Terminated, vcbatch.Completed, vcbatch.Failed:
		return &finishedState{job: jobInfo}
	case vcbatch.Terminating:
		return &terminatingState{job: jobInfo}
	case vcbatch.Aborting:
		return &abortingState{job: jobInfo}
	case vcbatch.Aborted:
		return &abortedState{job: jobInfo}
	case vcbatch.Completing:
		return &completingState{job: jobInfo}
	}

	// It's pending by default.
	return &pendingState{job: jobInfo}
}

盲猜 State 這時候對應一個 *pendingState 類型。所以我們接著找 *pendingState 對象的 Execute() 方法實現:

  • pkg/controllers/job/state/pending.go:29
func (ps *pendingState) Execute(action v1alpha1.Action) error {
	switch action {
	// ......
	default:
		return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
			if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
				status.State.Phase = vcbatch.Running
				return true
			}
			return false
		})
	}
}

SyncJob() 函數在這里出現了。

3.2 SyncJob 過程如何創建 PodGroup 和 Pod

繼續來看 SyncJob 的具體實現。上一節找到的 SyncJob() 函數中調用到了 *jobcontroller.syncJob() 方法,sync job 的具體邏輯就在這個 syncJob() 方法中實現。

這里主要有2個過程:

  1. initiateJob 方法創建 PodGroup;
  2. 創建 pods;
  • pkg/controllers/job/job_controller_actions.go:224
func (cc *jobcontroller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
    // ......
}

這個方法有點長,哎,一言難盡。我想提個 pr 給它拆分一下…… Anyway,這個方法里主要關注2個過程,我們直接來看吧。

3.2.1 創建 PodGroup

創建 PodGroup 的邏輯在 initiateJob() 方法中:

  • pkg/controllers/job/job_controller_actions.go:166
func (cc *jobcontroller) initiateJob(job *batch.Job) (*batch.Job, error) {
	// ......

	if err := cc.createOrUpdatePodGroup(newJob); err != nil {
		cc.recorder.Event(job, v1.EventTypeWarning, string(batch.PodGroupError),
			fmt.Sprintf("Failed to create PodGroup, err: %v", err))
		return nil, err
	}

	return newJob, nil
}

這里拿著 job 信息調用了一個 createOrUpdatePodGroup() 方法來完成和 Job 對應的 PodGroup 的創建。這個方法里 pg 實例化的主要邏輯是:

pg := &scheduling.PodGroup{
    ObjectMeta: metav1.ObjectMeta{
        Namespace: job.Namespace,
        // 這個 pgName 內容是 job.Name + "-" + string(job.UID)
        Name:        pgName,
        Annotations: job.Annotations,
        Labels:      job.Labels,
        OwnerReferences: []metav1.OwnerReference{
            *metav1.NewControllerRef(job, helpers.JobKind),
        },
    },
    Spec: scheduling.PodGroupSpec{
        MinMember:         job.Spec.MinAvailable,
        MinTaskMember:     minTaskMember,
        Queue:             job.Spec.Queue,
        MinResources:      cc.calcPGMinResources(job),
        PriorityClassName: job.Spec.PriorityClassName,
    },
}

換言之 PodGroup 和 Job 是一一對應的關系。

3.2.2 創建 Pods

繼續來看創建 pods 的過程:

  • pkg/controllers/job/job_controller_actions.go:335
for _, ts := range job.Spec.Tasks {
    ts.Template.Name = ts.Name
    tc := ts.Template.DeepCopy()
    name := ts.Template.Name

    pods, found := jobInfo.Pods[name]
    if !found {
        pods = map[string]*v1.Pod{}
    }

    var podToCreateEachTask []*v1.Pod
    // 每個 Task 對應一組 pods,所以這里有一個循環
    for i := 0; i < int(ts.Replicas); i++ {
        podName := fmt.Sprintf(jobhelpers.PodNameFmt, job.Name, name, i)
        if pod, found := pods[podName]; !found {
            // 這個 createJobPod 只是組裝 Pod 資源對象,類型是 *v1.Pod
            newPod := createJobPod(job, tc, ts.TopologyPolicy, i, jobForwarding)
            if err := cc.pluginOnPodCreate(job, newPod); err != nil {
                return err
            }
            // 加到隊列中
            podToCreateEachTask = append(podToCreateEachTask, newPod)
            waitCreationGroup.Add(1)
        } else {
            // ......
        }
    }
    podToCreate[ts.Name] = podToCreateEachTask
    // ......
}

這一輪循環負責解析 Job 中的所有 Tasks,然后給每個 Task 創建對應的 pods,加入到 podToCreateEachTask 這個切片中,進而得到 podToCreate (類型是 map[string][]*v1.Pod)這個 map,map 的 key 是 Task 的 Name,value 是每個 Task 對應的需要創建的 pods 列表。

createJobPod() 方法中有這樣幾行和 annotation 相關的代碼:

  • pkg/controllers/job/job_controller_util.go:100
index := strconv.Itoa(ix)
pod.Annotations[batch.TaskIndex] = index
pod.Annotations[batch.TaskSpecKey] = tsKey
pgName := job.Name + "-" + string(job.UID)
pod.Annotations[schedulingv2.KubeGroupNameAnnotationKey] = pgName
pod.Annotations[batch.JobNameKey] = job.Name
pod.Annotations[batch.QueueNameKey] = job.Spec.Queue
pod.Annotations[batch.JobVersion] = fmt.Sprintf("%d", job.Status.Version)
pod.Annotations[batch.PodTemplateKey] = fmt.Sprintf("%s-%s", job.Name, template.Name)

可以看到 Pod 的 annotation 里有一個 KubeGroupNameAnnotationKey = pgName,也就是 scheduling.k8s.io/group-name=pg-name,和前面我們在調度器里找到 annotation 匹配邏輯就對應上了。

然后來到第二個循環:

  • pkg/controllers/job/job_controller_actions.go:373
// 遍歷剛才組裝的 podToCreate map
for taskName, podToCreateEachTask := range podToCreate {
    if len(podToCreateEachTask) == 0 {
        continue
    }
    go func(taskName string, podToCreateEachTask []*v1.Pod) {
        // ......

        for _, pod := range podToCreateEachTask {
            go func(pod *v1.Pod) {
                defer waitCreationGroup.Done()
                // 創建 Pods
                newPod, err := cc.kubeClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
                // ......
            }(pod)
        }
    }(taskName, podToCreateEachTask)
}

4. 總結

今天我們先順著 Volcano Scheduler 部分的代碼找到了 PodGroup 的處理邏輯,看到了 Scheduler 拿到 PodGroup 后會組裝 JobInfo 對象;拿到 Pod 后會組裝 TaskInfo 對象(這里根據 Pod 的注解中指定的 PodGroup 名字來將 TaskInfo 和 JobInfo 關聯,也就是 Pod 和 PodGroup 的關聯。

接著我們又從 Volcano Controller(Job Controller)中找到了 PodGroupPod 的創建邏輯。在 Job 對象創建后,控制器會根據 Job 的信息創建一個唯一對應的 PodGroup,然后根據 Job 中的 Tasks 信息創建一系列的 pods,這些 pods 會帶上 PodGroup 名字(在注解里)

至此,我們知道了 Volcano 中調度器和控制器的職責分層,進一步也就能夠理解 Volcano 如何和 kubeflow 等其他框架結合完成復雜任務的批調度過程了。(上層框架創建 PodGroup 和 Pods,Volcano 根據 PodGroup 信息和 Pods 注解信息完成批調度過程。

5. 最后

下一步?我也沒想好。

看了幾天 Volcano 的源碼,整體感覺還是比較酣暢淋漓。一開始被調度器里的 Job 和 Task 概念帶坑里,感覺代碼很混亂;但是理解了 wrapper 的用意,知道了“調度”領域的 job 和 task 有不一樣的含義后,今天再刷就很輕松了。

Volcano 源碼整體還是 K8s 的“控制器+調度器”的邏輯,然后加上“任務調度領域”內的各種算法組成,代碼質量總的來說還可以,就是部分函數過長,循環嵌套過多,加上注釋和文檔的缺失,對于新人并不友好。

下一步我嘗試參與下 Volcano 社區,看能不能在“代碼可讀性”方向出一份力吧。

總結

以上是生活随笔為你收集整理的Volcano 原理、源码分析(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。