分布式定时任务框架Elastic-Job的使用
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
一、前言
? ? Elastic-Job是一個(gè)優(yōu)秀的分布式作業(yè)調(diào)度框架。
????Elastic-Job是一個(gè)分布式調(diào)度解決方案,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成。
????Elastic-Job-Lite定位為輕量級(jí)無(wú)中心化解決方案,使用jar包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。
????Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)。
1. Elastic-Job-Lite
-
分布式調(diào)度協(xié)調(diào)
-
彈性擴(kuò)容縮容
-
失效轉(zhuǎn)移
-
錯(cuò)過(guò)執(zhí)行作業(yè)重觸發(fā)
-
作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例
-
自診斷并修復(fù)分布式不穩(wěn)定造成的問(wèn)題
-
支持并行調(diào)度
-
支持作業(yè)生命周期操作
-
豐富的作業(yè)類(lèi)型
-
Spring整合以及命名空間提供
-
運(yùn)維平臺(tái)
2. Elastic-Job-Cloud
-
應(yīng)用自動(dòng)分發(fā)
-
基于Fenzo的彈性資源分配
-
分布式調(diào)度協(xié)調(diào)
-
彈性擴(kuò)容縮容
-
失效轉(zhuǎn)移
-
錯(cuò)過(guò)執(zhí)行作業(yè)重觸發(fā)
-
作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例
-
支持并行調(diào)度
-
支持作業(yè)生命周期操作
-
豐富的作業(yè)類(lèi)型
-
Spring整合
-
運(yùn)維平臺(tái)
-
基于Docker的進(jìn)程隔離(TBD)
二、導(dǎo)讀
? ? 1、Elastic-Job的核心思想
? ? 2、Elastic-Job的基本使用
三、Elastic-Job的核心思想
? ? 對(duì)于分布式計(jì)算而言,分片是最基本的思想,Elastic-Job也是沿用了這個(gè)思想,每個(gè)job跑部分?jǐn)?shù)據(jù),所有job執(zhí)行完成,便是全量數(shù)據(jù),官網(wǎng)給出的SimpleJob例子如下:
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ...}} }? ? 用switch case循環(huán)來(lái)對(duì)應(yīng)分片的業(yè)務(wù)邏輯,case分片的index,進(jìn)入業(yè)務(wù)邏輯執(zhí)行。當(dāng)然這里也有不適應(yīng)的場(chǎng)景,類(lèi)似于MapReduce需要shuffle的場(chǎng)景就不適合了,比方說(shuō),要根據(jù)某一個(gè)字段全局分組聚合求結(jié)果,這時(shí)候怎么分片都可能會(huì)不合理,因?yàn)槊總€(gè)分片只能處理N分之一的數(shù)據(jù),沒(méi)辦法shuffle再聚合,這一點(diǎn),也要根據(jù)具體的業(yè)務(wù)來(lái)使用。
? ?那么ShardingContext可以拿到那些信息呢?源碼如下
????
public final class ShardingContext {/*** 作業(yè)名稱(chēng).*/private final String jobName;/*** 作業(yè)任務(wù)ID.*/private final String taskId;/*** 分片總數(shù).*/private final int shardingTotalCount;/*** 作業(yè)自定義參數(shù).* 可以配置多個(gè)相同的作業(yè), 但是用不同的參數(shù)作為不同的調(diào)度實(shí)例.*/private final String jobParameter;/*** 分配于本作業(yè)實(shí)例的分片項(xiàng).*/private final int shardingItem;/*** 分配于本作業(yè)實(shí)例的分片參數(shù).*/private final String shardingParameter;public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {jobName = shardingContexts.getJobName();taskId = shardingContexts.getTaskId();shardingTotalCount = shardingContexts.getShardingTotalCount();jobParameter = shardingContexts.getJobParameter();this.shardingItem = shardingItem;shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);} }? ? 以上代碼,jobParameter和shardingItem是最有用的參數(shù),shardingItem決定switch case循環(huán)的走向,shardingParameter可以用業(yè)務(wù)的查詢(xún)條件,也可以用字符串拼接的方式組裝很復(fù)雜的參數(shù)用于特定的業(yè)務(wù)。
四、Elastic-Job的基本使用
? ? 1、Job配置項(xiàng)
public class ElasticJobConfig {private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3).shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,MyElasticJob.class.getCanonicalName());LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();} }? ? 幾點(diǎn)說(shuō)明:
? ? 注冊(cè)中心配置項(xiàng),設(shè)置zookeeper集群地址,我這里用的本地單節(jié)點(diǎn),所以只有一個(gè),當(dāng)然可以配置任務(wù)名稱(chēng),命名空間(namespace,本質(zhì)上會(huì)在zk里生成一個(gè)目錄),超時(shí)時(shí)間,最大重試次數(shù)等等
????LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite參數(shù)非常重要,設(shè)置這個(gè)參數(shù)為true,修改過(guò)job配置信息才會(huì)覆蓋zookeeper里的數(shù)據(jù),要不然不會(huì)生效。
? ? 2、SimpleJob的實(shí)現(xiàn)
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {switch (shardingContext.getShardingItem()) {case 0: {System.out.println("當(dāng)前分片:" + shardingContext.getShardingItem() + "=====" + "參數(shù):"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 1: {System.out.println("當(dāng)前分片:" + shardingContext.getShardingItem() + "=====" + "參數(shù):"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 2: {System.out.println("當(dāng)前分片:" + shardingContext.getShardingItem() + "=====" + "參數(shù):"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}default: {System.out.println("當(dāng)前分片:" + shardingContext.getShardingItem() + "=====" + "參數(shù):"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}}} }? ? 上面設(shè)置每5秒鐘執(zhí)行一次,執(zhí)行ElasticJobConfig的main方法,執(zhí)行結(jié)果如下:
????
? ? 從上面的結(jié)果,可以看出,執(zhí)行每個(gè)分片的任務(wù),其實(shí)是放到一個(gè)線(xiàn)程池去執(zhí)行的,對(duì)應(yīng)的分片信息和參數(shù)信息在shardingContext可以拿到,實(shí)現(xiàn)業(yè)務(wù)非常方便。
? ? 最后,如果啟動(dòng)多個(gè)JVM,那么這些任務(wù)就分散到各個(gè)節(jié)點(diǎn)里,如果一個(gè)節(jié)點(diǎn)宕機(jī),下次觸發(fā)任務(wù)時(shí),將把該分片任務(wù)丟到健康機(jī)器執(zhí)行,這里做到了節(jié)點(diǎn)容錯(cuò)。但是某個(gè)分片的任務(wù)在執(zhí)行過(guò)程中失敗了,那么這里是不會(huì)重新觸發(fā)改分片任務(wù)的執(zhí)行的。
?
?
????
轉(zhuǎn)載于:https://my.oschina.net/u/1778239/blog/3018941
總結(jié)
以上是生活随笔為你收集整理的分布式定时任务框架Elastic-Job的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 做梦梦到下雨鞋湿是什么
- 下一篇: border-radius 涨知识的