日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Schedulerx2.0分布式计算原理最佳实践

發(fā)布時(shí)間:2024/8/23 编程问答 69 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Schedulerx2.0分布式计算原理最佳实践 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1. 前言

Schedulerx2.0的客戶端提供分布式執(zhí)行、多種任務(wù)類型、統(tǒng)一日志等框架,用戶只要依賴schedulerx-worker這個(gè)jar包,通過schedulerx2.0提供的編程模型,簡單幾行代碼就能實(shí)現(xiàn)一套高可靠可運(yùn)維的分布式執(zhí)行引擎。

這篇文章重點(diǎn)是介紹基于schedulerx2.0的分布式執(zhí)行引擎原理和最佳實(shí)踐,相信看完這篇文章,大家都能寫出高效率的分布式作業(yè),說不定速度能提升好幾倍:)

2. 可擴(kuò)展的執(zhí)行引擎

Worker總體架構(gòu)參考Yarn的架構(gòu),分為TaskMaster, Container, Processor三層:

  • TaskMaster:類似于yarn的AppMaster,支持可擴(kuò)展的分布式執(zhí)行框架,進(jìn)行整個(gè)jobInstance的生命周期管理、container的資源管理,同時(shí)還有failover等能力。默認(rèn)實(shí)現(xiàn)StandaloneTaskMaster(單機(jī)執(zhí)行),BroadcastTaskMaster(廣播執(zhí)行),MapTaskMaster(并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算),MapReduceTaskMaster(并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算)。
  • Container:執(zhí)行業(yè)務(wù)邏輯的容器框架,支持線程/進(jìn)程/docker/actor等。
  • Processor:業(yè)務(wù)邏輯框架,不同的processor表示不同的任務(wù)類型。

以MapTaskMaster為例,大概的原理如下圖所示:

3. 分布式編程模型之Map模型

Schedulerx2.0提供了多種分布式編程模型,這篇文章主要介紹Map模型(之后的文章還會(huì)介紹MapReduce模型,適用更多的業(yè)務(wù)場(chǎng)景),簡單幾行代碼就可以將海量數(shù)據(jù)分布式到多臺(tái)機(jī)器上進(jìn)行分布式跑批,非常簡單易用。

針對(duì)不同的跑批場(chǎng)景,map模型作業(yè)還提供了并行計(jì)算、內(nèi)存網(wǎng)格、網(wǎng)格計(jì)算三種執(zhí)行方式:

  • 并行計(jì)算:子任務(wù)300以下,有子任務(wù)列表。
  • 內(nèi)存網(wǎng)格:子任務(wù)5W以下,無子任務(wù)列表,速度快。
  • 網(wǎng)格計(jì)算:子任務(wù)100W以下,無子任務(wù)列表。

4. 并行計(jì)算原理

因?yàn)椴⑿腥蝿?wù)具有子任務(wù)列表:


如上圖,子任務(wù)列表可以看到每個(gè)子任務(wù)的狀態(tài)、機(jī)器,還有重跑、查看日志等操作。

因?yàn)椴⑿杏?jì)算要做到子任務(wù)級(jí)別的可視化,并且worker掛了、重啟還能支持手動(dòng)重跑,就需要把task持久化到server端:


如上圖所示:

  • server觸發(fā)jobInstance到某個(gè)worker,選中為master。
  • MapTaskMaster選擇某個(gè)worker執(zhí)行root任務(wù),當(dāng)執(zhí)行map方法時(shí),會(huì)回調(diào)MapTaskMaster。
  • MapTaskMaster收到map方法,會(huì)把task持久化到server端。
  • 同時(shí),MapTaskMaster還有個(gè)pull線程,不停拉取INIT狀態(tài)的task,并派發(fā)給其他worker執(zhí)行。
  • 5. 網(wǎng)格計(jì)算原理

    網(wǎng)格計(jì)算要支持百萬級(jí)別的task,如果所有任務(wù)都往server回寫,server肯定扛不住,所以網(wǎng)格計(jì)算的存儲(chǔ)實(shí)際上是分布式在用戶自己的機(jī)器上的:

    如上圖所示:

  • server觸發(fā)jobInstance到某個(gè)worker,選中為master。
  • MapTaskMaster選擇某個(gè)worker執(zhí)行root任務(wù),當(dāng)執(zhí)行map方法時(shí),會(huì)回調(diào)MapTaskMaster。
  • MapTaskMaster收到map方法,會(huì)把task持久化到本地h2數(shù)據(jù)庫。
  • 同時(shí),MapTaskMaster還有個(gè)pull線程,不停拉取INIT狀態(tài)的task,并派發(fā)給其他worker執(zhí)行。
  • 6. 最佳實(shí)踐

    6.1 需求

    舉個(gè)例子:

  • 讀取A表中status=0的數(shù)據(jù)。
  • 處理這些數(shù)據(jù),插入B表。
  • 把A表中處理過的數(shù)據(jù)的修改status=1。
  • 數(shù)據(jù)量有4億+,希望縮短時(shí)間。
  • 6.2 反面案例

    我們先看下如下代碼是否有問題?

    public class ScanSingleTableProcessor extends MapJobProcessor {private static int pageSize = 1000;@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {int recordCount = queryRecordCount();int pageAmount = recordCount / pageSize;//計(jì)算分頁數(shù)量for(int i = 0 ; i < pageAmount ; i ++) {List<Record> recordList = queryRecord(i);//根據(jù)分頁查詢一頁數(shù)據(jù)map(recordList, "record記錄");//把子任務(wù)分發(fā)出去并行處理}return new ProcessResult(true);//true表示執(zhí)行成功,false表示失敗} else if ("record記錄".equals(taskName)) {//TODOreturn new ProcessResult(true);}return new ProcessResult(false);} }

    如上面的代碼所示,在root任務(wù)中,會(huì)把數(shù)據(jù)庫所有記錄讀取出來,每一行就是一個(gè)Record,然后分發(fā)出去,分布式到不同的worker上去執(zhí)行。邏輯是沒有問題的,但是實(shí)際上性能非常的差。結(jié)合網(wǎng)格計(jì)算原理,我們把上面的代碼繪制成下面這幅圖:

    如上圖所示,root任務(wù)一開始會(huì)全量的讀取A表的數(shù)據(jù),然后會(huì)全量的存到h2中,pull線程還會(huì)全量的從h2讀取一次所有的task,還會(huì)分發(fā)給所有客戶端。所以實(shí)際上對(duì)A表中的數(shù)據(jù):

    • 全量讀2次
    • 全量寫一次
    • 全量傳輸一次

    這個(gè)效率是非常低的。

    6.3 正面案例

    下面給出正面案例的代碼:

    public class ScanSingleTableJobProcessor extends MapJobProcessor {private static final int pageSize = 100;static class PageTask {private int startId;private int endId;public PageTask(int startId, int endId) {this.startId = startId;this.endId = endId;}public int getStartId() {return startId;}public int getEndId() {return endId;}}@Overridepublic ProcessResult process(JobContext context) {String taskName = context.getTaskName();Object task = context.getTask();if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {System.out.println("start root task");Pair<Integer, Integer> idPair = queryMinAndMaxId();int minId = idPair.getFirst();int maxId = idPair.getSecond();List<PageTask> taskList = Lists.newArrayList();int step = (int) ((maxId - minId) / pageSize); //計(jì)算分頁數(shù)量for (int i = minId; i < maxId; i+=step) {taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));}return map(taskList, "Level1Dispatch");} else if (taskName.equals("Level1Dispatch")) {PageTask record = (PageTask)task;long startId = record.getStartId();long endId = record.getEndId();//TODOreturn new ProcessResult(true);}return new ProcessResult(true);}@Overridepublic void postProcess(JobContext context) {//TODOSystem.out.println("all tasks is finished.");}private Pair<Integer, Integer> queryMinAndMaxId() {//TODO select min(id),max(id) from xxxreturn null;} }

    如上面的代碼所示,

    • 每個(gè)task不是整行記錄的record,而是PageTask,里面就2個(gè)字段,startId和endId。
    • root任務(wù),沒有全量的讀取A表,而是讀一下整張表的minId和maxId,然后構(gòu)造PageTask進(jìn)行分頁。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每個(gè)task處理A表不同的數(shù)據(jù)。
    • 在下一級(jí)task中,如果拿到的是PageTask,再根據(jù)id區(qū)間去A表處理數(shù)據(jù)。

    根據(jù)上面的代碼和網(wǎng)格計(jì)算原理,得出下面這幅圖:

    如上圖所示,

    • A表只需要全量讀取一次。
    • 子任務(wù)數(shù)量比反面案例少了上千、上萬倍。
    • 子任務(wù)的body非常小,如果recod中有大字段,也少了上千、上萬倍。

    綜上,對(duì)A表訪問次數(shù)少了好幾倍,對(duì)h2存儲(chǔ)壓力少了上萬倍,不但執(zhí)行速度可以快很多,還保證不會(huì)把自己本地的h2數(shù)據(jù)庫搞掛。


    原文鏈接
    本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    總結(jié)

    以上是生活随笔為你收集整理的Schedulerx2.0分布式计算原理最佳实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。