Schedulerx2.0分布式计算原理最佳实践
1. 前言
Schedulerx2.0的客戶端提供分布式執(zhí)行、多種任務(wù)類型、統(tǒng)一日志等框架,用戶只要依賴schedulerx-worker這個(gè)jar包,通過schedulerx2.0提供的編程模型,簡單幾行代碼就能實(shí)現(xiàn)一套高可靠可運(yùn)維的分布式執(zhí)行引擎。
這篇文章重點(diǎn)是介紹基于schedulerx2.0的分布式執(zhí)行引擎原理和最佳實(shí)踐,相信看完這篇文章,大家都能寫出高效率的分布式作業(yè),說不定速度能提升好幾倍:)
2. 可擴(kuò)展的執(zhí)行引擎
Worker總體架構(gòu)參考Yarn的架構(gòu),分為TaskMaster, Container, Processor三層:
- TaskMaster:類似于yarn的AppMaster,支持可擴(kuò)展的分布式執(zhí)行框架,進(jìn)行整個(gè)jobInstance的生命周期管理、container的資源管理,同時(shí)還有failover等能力。默認(rèn)實(shí)現(xiàn)StandaloneTaskMaster(單機(jī)執(zhí)行),BroadcastTaskMaster(廣播執(zhí)行),MapTaskMaster(并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算),MapReduceTaskMaster(并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算)。
- Container:執(zhí)行業(yè)務(wù)邏輯的容器框架,支持線程/進(jìn)程/docker/actor等。
- Processor:業(yè)務(wù)邏輯框架,不同的processor表示不同的任務(wù)類型。
以MapTaskMaster為例,大概的原理如下圖所示:
3. 分布式編程模型之Map模型
Schedulerx2.0提供了多種分布式編程模型,這篇文章主要介紹Map模型(之后的文章還會(huì)介紹MapReduce模型,適用更多的業(yè)務(wù)場(chǎng)景),簡單幾行代碼就可以將海量數(shù)據(jù)分布式到多臺(tái)機(jī)器上進(jìn)行分布式跑批,非常簡單易用。
針對(duì)不同的跑批場(chǎng)景,map模型作業(yè)還提供了并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算三種執(zhí)行方式:
- 并行計(jì)算:子任務(wù)300以下,有子任務(wù)列表。
- 內(nèi)存網(wǎng)格:子任務(wù)5W以下,無子任務(wù)列表,速度快。
- 網(wǎng)格計(jì)算:子任務(wù)100W以下,無子任務(wù)列表。
4. 并行計(jì)算原理
因?yàn)椴⑿腥蝿?wù)具有子任務(wù)列表:
如上圖,子任務(wù)列表可以看到每個(gè)子任務(wù)的狀態(tài)、機(jī)器,還有重跑、查看日志等操作。
因?yàn)椴⑿杏?jì)算要做到子任務(wù)級(jí)別的可視化,并且worker掛了、重啟還能支持手動(dòng)重跑,就需要把task持久化到server端:
如上圖所示:
5. 網(wǎng)格計(jì)算原理
網(wǎng)格計(jì)算要支持百萬級(jí)別的task,如果所有任務(wù)都往server回寫,server肯定扛不住,所以網(wǎng)格計(jì)算的存儲(chǔ)實(shí)際上是分布式在用戶自己的機(jī)器上的:
如上圖所示:
6. 最佳實(shí)踐
6.1 需求
舉個(gè)例子:
6.2 反面案例
我們先看下如下代碼是否有問題?
public class ScanSingleTableProcessor extends MapJobProcessor {private static int pageSize = 1000;@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {int recordCount = queryRecordCount();int pageAmount = recordCount / pageSize;//計(jì)算分頁數(shù)量for(int i = 0 ; i < pageAmount ; i ++) {List<Record> recordList = queryRecord(i);//根據(jù)分頁查詢一頁數(shù)據(jù)map(recordList, "record記錄");//把子任務(wù)分發(fā)出去并行處理}return new ProcessResult(true);//true表示執(zhí)行成功,false表示失敗} else if ("record記錄".equals(taskName)) {//TODOreturn new ProcessResult(true);}return new ProcessResult(false);} }如上面的代碼所示,在root任務(wù)中,會(huì)把數(shù)據(jù)庫所有記錄讀取出來,每一行就是一個(gè)Record,然后分發(fā)出去,分布式到不同的worker上去執(zhí)行。邏輯是沒有問題的,但是實(shí)際上性能非常的差。結(jié)合網(wǎng)格計(jì)算原理,我們把上面的代碼繪制成下面這幅圖:
如上圖所示,root任務(wù)一開始會(huì)全量的讀取A表的數(shù)據(jù),然后會(huì)全量的存到h2中,pull線程還會(huì)全量的從h2讀取一次所有的task,還會(huì)分發(fā)給所有客戶端。所以實(shí)際上對(duì)A表中的數(shù)據(jù):
- 全量讀2次
- 全量寫一次
- 全量傳輸一次
這個(gè)效率是非常低的。
6.3 正面案例
下面給出正面案例的代碼:
public class ScanSingleTableJobProcessor extends MapJobProcessor {private static final int pageSize = 100;static class PageTask {private int startId;private int endId;public PageTask(int startId, int endId) {this.startId = startId;this.endId = endId;}public int getStartId() {return startId;}public int getEndId() {return endId;}}@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {System.out.println("start root task");Pair<Integer, Integer> idPair = queryMinAndMaxId();int minId = idPair.getFirst();int maxId = idPair.getSecond();List<PageTask> taskList = Lists.newArrayList();int step = (int) ((maxId - minId) / pageSize); //計(jì)算分頁數(shù)量for (int i = minId; i < maxId; i+=step) {taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));}return map(taskList, "Level1Dispatch");} else if (taskName.equals("Level1Dispatch")) {PageTask record = (PageTask)task;long startId = record.getStartId();long endId = record.getEndId();//TODOreturn new ProcessResult(true);}return new ProcessResult(true);}@Overridepublic void postProcess(JobContext context) {//TODOSystem.out.println("all tasks is finished.");}private Pair<Integer, Integer> queryMinAndMaxId() {//TODO select min(id),max(id) from xxxreturn null;} }如上面的代碼所示,
- 每個(gè)task不是整行記錄的record,而是PageTask,里面就2個(gè)字段,startId和endId。
- root任務(wù),沒有全量的讀取A表,而是讀一下整張表的minId和maxId,然后構(gòu)造PageTask進(jìn)行分頁。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每個(gè)task處理A表不同的數(shù)據(jù)。
- 在下一級(jí)task中,如果拿到的是PageTask,再根據(jù)id區(qū)間去A表處理數(shù)據(jù)。
根據(jù)上面的代碼和網(wǎng)格計(jì)算原理,得出下面這幅圖:
如上圖所示,
- A表只需要全量讀取一次。
- 子任務(wù)數(shù)量比反面案例少了上千、上萬倍。
- 子任務(wù)的body非常小,如果recod中有大字段,也少了上千、上萬倍。
綜上,對(duì)A表訪問次數(shù)少了好幾倍,對(duì)h2存儲(chǔ)壓力少了上萬倍,不但執(zhí)行速度可以快很多,還保證不會(huì)把自己本地的h2數(shù)據(jù)庫搞掛。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的Schedulerx2.0分布式计算原理最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四年从P7到P9,这个阿里小二的秘诀是给
- 下一篇: OpenKruise - 云原生应用自动