日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ElasticJob‐Lite:自定义作业分片策略

發布時間:2023/12/10 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ElasticJob‐Lite:自定义作业分片策略 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java SPI機制

在上一篇博客中介紹了ElasticJob的作業分片策略:

  • ElasticJob‐Lite:作業分片策略介紹與源碼分析

其中提到了ElasticJob是通過Java提供的SPI機制(ServiceLoader類)加載所有作業分片策略。

ServiceLoader類就是Java提供的SPI,SPI(Service Provider Interface)是JDK內置的一種服務提供發現機制,可以用來啟用框架擴展和替換組件,主要是被框架的開發人員使用,不同廠商可以針對同一接口做出不同的實現,比如java.sql.Driver接口,MySQL和PostgreSQL都提供了對應的實現給用戶使用,而Java的SPI機制可以為某個接口尋找服務實現。Java中SPI機制主要思想是將裝配的控制權移到程序之外,在模塊化設計中這個機制尤其重要,其核心思想就是解耦。

ServiceLoader類正常工作的唯一要求是服務提供類必須具有無參構造函數,以便它們可以在加載期間實例化。通過在資源目錄的META-INF/services中放置服務提供者配置文件來標識服務提供者,文件名是服務類型的完全限定名(比如ElasticJobListener類的完全限定名),該文件包含具體的服務提供者類的完全限定名列表(ElasticJobListener實現類的完全限定名列表),每行一個,每個名稱周圍的空格和制表符以及空行都將被忽略,該文件必須以UTF-8編碼。

自定義作業分片策略

所有可用的作業分片策略在JobShardingStrategyFactory類的靜態塊中被加載(通過ElasticJobServiceLoader類,該類是ElasticJob基于Java SPI機制實現的特定于作業的服務加載器)。

static {ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);}

加載的類型是JobShardingStrategy.class,因此自定義的作業分片策略需要實現該接口。

自定義作業分片策略ShuffleJobShardingStrategy類:

package com.kaven.job.my;import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance; import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;import java.util.*;public class ShuffleJobShardingStrategy implements JobShardingStrategy {@Overridepublic Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {if (jobInstances.isEmpty()) {return Collections.emptyMap();}// 先將作業分片項裝入容器List<Integer> shuffleShardingList = new ArrayList<>(shardingTotalCount);for (int i = 0; i < shardingTotalCount; i++) {shuffleShardingList.add(i);}// 將容器中的作業分片項順序打亂(使用容器的shuffle方法)Collections.shuffle(shuffleShardingList);// 模仿AverageAllocationJobShardingStrategy作業分片策略進行分配Map<JobInstance, List<Integer>> result = shardingShuffle(jobInstances, shardingTotalCount, shuffleShardingList);addShuffle(jobInstances, shardingTotalCount, result, shuffleShardingList);return result;}private Map<JobInstance, List<Integer>> shardingShuffle(final List<JobInstance> shardingUnits,final int shardingTotalCount,final List<Integer> shuffleShardingList) {Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);// 每個作業服務器最少應該分配的作業分片項數int itemCountPerSharding = shardingTotalCount / shardingUnits.size();int count = 0;for (JobInstance each : shardingUnits) {// 每個作業服務器申請的作業分片項列表(容量為itemCountPerSharding + 1)// itemCountPerSharding + 1為每個作業服務器最多應該分配的作業分片項數List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {// 給作業分片項列表添加容器中的第i個作業分片項shardingItems.add(shuffleShardingList.get(i));}// 將作業服務器與它執行的作業分片項列表進行關聯result.put(each, shardingItems);count++;}return result;}private void addShuffle(final List<JobInstance> shardingUnits, final int shardingTotalCount,final Map<JobInstance, List<Integer>> shardingResults,final List<Integer> shuffleShardingList) {// 無法平均分配的分片項數int aliquant = shardingTotalCount % shardingUnits.size();// 已分配的無法平均分配的分片項數int count = 0;for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {// 是否還有無法平均分配的分片項if (count < aliquant) {// 分配給序號較小的作業服務器entry.getValue().add(shuffleShardingList.get(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count));}// 已分配數更新count++;}}// 作業分片策略的標識符@Overridepublic String getType() {return "Shuffle";} }

博主自定義的ShuffleJobShardingStrategy作業分片策略是模仿AverageAllocationJobShardingStrategy作業分片策略(默認的作業分片策略),只是先將作業分片項裝入容器,然后將容器中的作業分片項順序打亂(使用容器的shuffle方法),之后再基于該作業分片項容器使用AverageAllocationJobShardingStrategy作業分片策略給作業服務器分配該容器中的作業分片項,如果不了解AverageAllocationJobShardingStrategy作業分片策略,可以去看看最上面列出的博客。

添加服務實現

在resources的META-INF/services中放置服務提供者配置文件來標識服務提供者,如下圖所示:

測試

作業定義(Simple作業):

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;import java.text.SimpleDateFormat; import java.util.Date;public class MySimpleJob implements SimpleJob {private static final SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {String job = shardingContext.getShardingParameter();if(job == null || job.trim().equals("")) {System.out.println("請指定幫[Kaven]執行的任務名稱!");throw new RuntimeException();}System.out.printf("%s 執行任務%d - [%s]!\n", formatter.format(new Date()),shardingContext.getShardingItem(), job);} }

啟動類:

package com.kaven.job;import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;public class Application {public static void main(String[] args) {new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();}// 注冊中心private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zc = new ZookeeperConfiguration("192.168.1.200:9000", "my-job");zc.setConnectionTimeoutMilliseconds(40000);zc.setMaxRetries(5);CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zc);regCenter.init();return regCenter;}// 作業配置private static JobConfiguration createJobConfiguration() {String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫博客,6=看源碼";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs)// 使用自定義的作業分片策略.jobShardingStrategyType("Shuffle").overwrite(true).build();} }

啟動三個作業服務器,輸出如下圖所示:



輸出符合預期,因為自定義作業分片策略是模仿AverageAllocationJobShardingStrategy作業分片策略,但自定義作業分片策略中將作業的分片項順序打亂了,因此給每個作業服務器分配的作業分片項可能不是連續的。

修改作業配置(使用默認的作業分片策略):

private static JobConfiguration createJobConfiguration() {String jobs = "0=看論文,1=做實驗,2=打比賽,3=開組會,4=看書,5=寫博客,6=看源碼";return JobConfiguration.newBuilder("MySimpleJob", 7).cron("30 0/2 * * * ?").shardingItemParameters(jobs) // .jobShardingStrategyType("Shuffle").overwrite(true).build();}

啟動三個作業服務器,輸出如下圖所示:



輸出符合AverageAllocationJobShardingStrategy作業分片策略,[0,1,6]、[2,3]、[4,5]很顯然是有序的,而博主自定義的作業分片策略是亂序的。

ElasticJob如何自定義作業分片策略就介紹到這里,如果博主有說錯的地方或者大家有不同的見解,歡迎大家評論補充。

總結

以上是生活随笔為你收集整理的ElasticJob‐Lite:自定义作业分片策略的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。