日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

js reduce实现中间件_MapReduce 模型

發布時間:2024/4/17 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 js reduce实现中间件_MapReduce 模型 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

MapReduce 模型是 Map 模型的擴展,新增 Reduce接口,需要實現 MapReduceJobProcessor。

注意事項

MapReduce 模型只有一個 Reduce,所有子任務完成后會執行 Reduce 方法,可以在 Reduce 方法中返回該任務示例的執行結果,作為工作流的上下游數據傳遞。如果有子任務失敗,Reduce

不會執行。Reduce 失敗,整個任務示例也失敗。

Scheduler X 不保證子任務一定執行一次,在特殊條件下會 failover,可能會導致子任務重復執行,需要業務方自己實現冪等。

Scheduler X 使用的是 Hessian 序列化框架,目前不支持 LocalDateTime 和 BigDecimal。子任務中如果有如上兩個數據結構,請替換其他的數據結構(特別是 BigDecimal,序列化不會報錯,反序列化會變成 0)。

接口

接口

解釋

是否必選

ProcessResult process(JobContext context)

每個子任務執行業務的入口,需要從 context 里獲取 taskName,自己判斷是哪個子任務,進行相應的邏輯處理。執行完成后,需要返回 ProcessResult。

ProcessResult map(List extends Object> taskList, String taskName)

執行 map 方法可以把一批子任務分布式到多臺機器上執行,可以 map 多次。如果 taskList 是空,返回失敗。執行完成后,需要返回 ProcessResult。

void kill(JobContext context)

前端 kill 任務會觸發該方法,需要用戶自己實現如何中斷業務。

執行方式

并行計算:最多支持 300 任務,有子任務列表。

注意 秒級別任務不要選擇并行計算。

內存網格:基于內存計算,最多支持 50,000 以下子任務,速度快。

網格計算:基于文件計算,最多支持 1,000,000 子任務。

高級配置

發送 500 條消息的 Demo 示例(適用于 MapReduce 模型)

@Component

public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

@Override

public ProcessResult process(JobContext context) throws Exception {

String taskName = context.getTaskName();

int dispatchNum=500;

if (isRootTask(context)) {

System.out.println("start root task");

List msgList = Lists.newArrayList();

for (int i = 0; i <= dispatchNum; i++) {

msgList.add("msg_" + i);

}

return map(msgList, "Level1Dispatch");

} else if (taskName.equals("Level1Dispatch")) {

String task = (String)context.getTask();

System.out.println(task);

return new ProcessResult(true);

}

return new ProcessResult(false);

}

@Override

public ProcessResult reduce(JobContext context) throws Exception {

return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");

}

}

處理單表數據的 Demo 示例(適用于 Map 或 MapReduce 模型)

@Component

public class ScanSingleTableJobProcessor extends MapJobProcessor {

@Service

private XXXService xxxService;

private final int PAGE_SIZE = 500;

static class PageTask {

private long startId;

private long endId;

public PageTask(long startId, long endId) {

this.startId = startId;

this.endId = endId;

}

public long getStartId() {

return startId;

}

public long getEndId() {

return endId;

}

}

@Override

public ProcessResult process(JobContext context) throws Exception {

String tableName = context.getJobParameters(); //多個 Job 后端代碼可以一致,通過控制臺配置 Job 參數表示表名。

String taskName = context.getTaskName();

Object task = context.getTask();

if (isRootTask(context)) {

Pair idPair = queryMinAndMaxId(tableName);

long minId = idPair.getFirst();

long maxId = idPair.getSecond();

List tasks = Lists.newArrayList();

int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量

for (long i = minId; i < maxId; i+=step) {

tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));

}

return map(tasks, "PageTask");

} else if (taskName.equals("PageTask")) {

PageTask pageTask = (PageTask)task;

long startId = pageTask.getStartId();

long endId = pageTask.getEndId();

List records = queryRecord(tableName, startId, endId);

//TODO handle records

return new ProcessResult(true);

}

return new ProcessResult(false);

}

private Pair queryMinAndMaxId(String tableName) {

//TODO select min(id),max(id) from [tableName]

return new Pair(1L, 10000L);

}

private List queryRecord(String tableName, long startId, long endId) {

List records = Lists.newArrayList();

//TODO select * from [tableName] where id>=[startId] and id

return records;

}

}

處理分庫分表數據的 Demo 示例(適用于 Map 或 MapReduce 模型)

@Component

public class ScanShardingTableJobProcessor extends MapJobProcessor {

@Service

private XXXService xxxService;

private final int PAGE_SIZE = 500;

static class PageTask {

private String tableName;

private long startId;

private long endId;

public PageTask(String tableName, long startId, long endId) {

this.tableName = tableName;

this.startId = startId;

this.endId = endId;

}

public String getTableName() {

return tableName;

}

public long getStartId() {

return startId;

}

public long getEndId() {

return endId;

}

}

@Override

public ProcessResult process(JobContext context) throws Exception {

String taskName = context.getTaskName();

Object task = context.getTask();

if (isRootTask(context)) {

//先分庫

List dbList = getDbList();

return map(dbList, "DbTask");

} else if (taskName.equals("DbTask")) {

//根據分庫去分表

String dbName = (String)task;

List tableList = getTableList(dbName);

return map(tableList, "TableTask");

} else if (taskName.equals("TableTask")) {

//如果一個分表也很大,再分頁

String tableName = (String)task;

Pair idPair = queryMinAndMaxId(tableName);

long minId = idPair.getFirst();

long maxId = idPair.getSecond();

List tasks = Lists.newArrayList();

int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數量

for (long i = minId; i < maxId; i+=step) {

tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));

}

return map(tasks, "PageTask");

} else if (taskName.equals("PageTask")) {

PageTask pageTask = (PageTask)task;

String tableName = pageTask.getTableName();

long startId = pageTask.getStartId();

long endId = pageTask.getEndId();

List records = queryRecord(tableName, startId, endId);

//TODO handle records

return new ProcessResult(true);

}

return new ProcessResult(false);

}

private List getDbList() {

List dbList = Lists.newArrayList();

//TODO 返回分庫列表

return dbList;

}

private List getTableList(String dbName) {

List tableList = Lists.newArrayList();

//TODO 返回分表列表

return tableList;

}

private Pair queryMinAndMaxId(String tableName) {

//TODO select min(id),max(id) from [tableName]

return new Pair(1L, 10000L);

}

private List queryRecord(String tableName, long startId, long endId) {

List records = Lists.newArrayList();

//TODO select * from [tableName] where id>=[startId] and id

return records;

}

}

處理 50 條消息并且返回子任務結果由 Reduce 匯總的 Demo 示例(適用于 MapReduce 模型)

@Component

public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

@Override

public ProcessResult process(JobContext context) throws Exception {

String taskName = context.getTaskName();

int dispatchNum = 50;

if (context.getJobParameters() != null) {

dispatchNum = Integer.valueOf(context.getJobParameters());

}

if (isRootTask(context)) {

System.out.println("start root task");

List msgList = Lists.newArrayList();

for (int i = 0; i <= dispatchNum; i++) {

msgList.add("msg_" + i);

}

return map(msgList, "Level1Dispatch");

} else if (taskName.equals("Level1Dispatch")) {

String task = (String)context.getTask();

Thread.sleep(2000);

return new ProcessResult(true, task);

}

return new ProcessResult(false);

}

@Override

public ProcessResult reduce(JobContext context) throws Exception {

for (Entry result : context.getTaskResults().entrySet()) {

System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());

}

return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");

}

}

總結

以上是生活随笔為你收集整理的js reduce实现中间件_MapReduce 模型的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。