Schedulerx2.0分布式计算原理最佳实践
1. 前言
Schedulerx2.0的客戶端提供分布式執(zhí)行、多種任務類型、統(tǒng)一日志等框架,用戶只要依賴schedulerx-worker這個jar包,通過schedulerx2.0提供的編程模型,簡單幾行代碼就能實現(xiàn)一套高可靠可運維的分布式執(zhí)行引擎。
這篇文章重點是介紹基于schedulerx2.0的分布式執(zhí)行引擎原理和最佳實踐,相信看完這篇文章,大家都能寫出高效率的分布式作業(yè),說不定速度能提升好幾倍:)
2. 可擴展的執(zhí)行引擎
Worker總體架構參考Yarn的架構,分為TaskMaster, Container, Processor三層:
- TaskMaster:類似于yarn的AppMaster,支持可擴展的分布式執(zhí)行框架,進行整個jobInstance的生命周期管理、container的資源管理,同時還有failover等能力。默認實現(xiàn)StandaloneTaskMaster(單機執(zhí)行),BroadcastTaskMaster(廣播執(zhí)行),MapTaskMaster(并行計算、內存網格、網格計算),MapReduceTaskMaster(并行計算、內存網格、網格計算)。
- Container:執(zhí)行業(yè)務邏輯的容器框架,支持線程/進程/docker/actor等。
- Processor:業(yè)務邏輯框架,不同的processor表示不同的任務類型。
以MapTaskMaster為例,大概的原理如下圖所示:
3. 分布式編程模型之Map模型
Schedulerx2.0提供了多種分布式編程模型,這篇文章主要介紹Map模型(之后的文章還會介紹MapReduce模型,適用更多的業(yè)務場景),簡單幾行代碼就可以將海量數(shù)據(jù)分布式到多臺機器上進行分布式跑批,非常簡單易用。
針對不同的跑批場景,map模型作業(yè)還提供了并行計算、內存網格、網格計算三種執(zhí)行方式:
- 并行計算:子任務300以下,有子任務列表。
- 內存網格:子任務5W以下,無子任務列表,速度快。
- 網格計算:子任務100W以下,無子任務列表。
4. 并行計算原理
因為并行任務具有子任務列表:
如上圖,子任務列表可以看到每個子任務的狀態(tài)、機器,還有重跑、查看日志等操作。
因為并行計算要做到子任務級別的可視化,并且worker掛了、重啟還能支持手動重跑,就需要把task持久化到server端:
如上圖所示:
5. 網格計算原理
網格計算要支持百萬級別的task,如果所有任務都往server回寫,server肯定扛不住,所以網格計算的存儲實際上是分布式在用戶自己的機器上的:
如上圖所示:
6. 最佳實踐
6.1 需求
舉個例子:
6.2 反面案例
我們先看下如下代碼是否有問題?
public class ScanSingleTableProcessor extends MapJobProcessor {private static int pageSize = 1000;@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {int recordCount = queryRecordCount();int pageAmount = recordCount / pageSize;//計算分頁數(shù)量for(int i = 0 ; i < pageAmount ; i ++) {List<Record> recordList = queryRecord(i);//根據(jù)分頁查詢一頁數(shù)據(jù)map(recordList, "record記錄");//把子任務分發(fā)出去并行處理}return new ProcessResult(true);//true表示執(zhí)行成功,false表示失敗} else if ("record記錄".equals(taskName)) {//TODOreturn new ProcessResult(true);}return new ProcessResult(false);} }如上面的代碼所示,在root任務中,會把數(shù)據(jù)庫所有記錄讀取出來,每一行就是一個Record,然后分發(fā)出去,分布式到不同的worker上去執(zhí)行。邏輯是沒有問題的,但是實際上性能非常的差。結合網格計算原理,我們把上面的代碼繪制成下面這幅圖:
如上圖所示,root任務一開始會全量的讀取A表的數(shù)據(jù),然后會全量的存到h2中,pull線程還會全量的從h2讀取一次所有的task,還會分發(fā)給所有客戶端。所以實際上對A表中的數(shù)據(jù):
- 全量讀2次
- 全量寫一次
- 全量傳輸一次
這個效率是非常低的。
6.3 正面案例
下面給出正面案例的代碼:
public class ScanSingleTableJobProcessor extends MapJobProcessor {private static final int pageSize = 100;static class PageTask {private int startId;private int endId;public PageTask(int startId, int endId) {this.startId = startId;this.endId = endId;}public int getStartId() {return startId;}public int getEndId() {return endId;}}@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {System.out.println("start root task");Pair<Integer, Integer> idPair = queryMinAndMaxId();int minId = idPair.getFirst();int maxId = idPair.getSecond();List<PageTask> taskList = Lists.newArrayList();int step = (int) ((maxId - minId) / pageSize); //計算分頁數(shù)量for (int i = minId; i < maxId; i+=step) {taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));}return map(taskList, "Level1Dispatch");} else if (taskName.equals("Level1Dispatch")) {PageTask record = (PageTask)task;long startId = record.getStartId();long endId = record.getEndId();//TODOreturn new ProcessResult(true);}return new ProcessResult(true);}@Overridepublic void postProcess(JobContext context) {//TODOSystem.out.println("all tasks is finished.");}private Pair<Integer, Integer> queryMinAndMaxId() {//TODO select min(id),max(id) from xxxreturn null;} }如上面的代碼所示,
- 每個task不是整行記錄的record,而是PageTask,里面就2個字段,startId和endId。
- root任務,沒有全量的讀取A表,而是讀一下整張表的minId和maxId,然后構造PageTask進行分頁。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每個task處理A表不同的數(shù)據(jù)。
- 在下一級task中,如果拿到的是PageTask,再根據(jù)id區(qū)間去A表處理數(shù)據(jù)。
根據(jù)上面的代碼和網格計算原理,得出下面這幅圖:
如上圖所示,
- A表只需要全量讀取一次。
- 子任務數(shù)量比反面案例少了上千、上萬倍。
- 子任務的body非常小,如果recod中有大字段,也少了上千、上萬倍。
綜上,對A表訪問次數(shù)少了好幾倍,對h2存儲壓力少了上萬倍,不但執(zhí)行速度可以快很多,還保證不會把自己本地的h2數(shù)據(jù)庫搞掛。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Schedulerx2.0分布式计算原理最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四年从P7到P9,这个阿里小二的秘诀是给
- 下一篇: OpenKruise - 云原生应用自动