日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

史上最全的延迟任务实现方式汇总!附代码(强烈推荐)

發布時間:2025/3/11 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 史上最全的延迟任务实现方式汇总!附代码(强烈推荐) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這篇文章的誕生要感謝一位讀者,是他讓這篇優秀的文章有了和大家見面的機會,重點是優秀文章,哈哈。

事情的經過是這樣的...

不用謝我,送人玫瑰,手有余香。相信接下來的內容一定不會讓你失望,因為它將是目前市面上最好的關于“延遲任務”的文章,這也一直是我寫作追求的目標,讓我的每一篇文章都比市面上的好那么一點點。

好了,話不多說,直接進入今天的主題,本文的主要內容如下圖所示:

什么是延遲任務?

顧明思議,我們把需要延遲執行的任務叫做延遲任務

延遲任務的使用場景有以下這些:

  • 紅包 24?小時未被查收,需要延遲執退還業務;
  • 每個月賬單日,需要給用戶發送當月的對賬單;
  • 訂單下單之后 30 分鐘后,用戶如果沒有付錢,系統需要自動取消訂單。
  • 等事件都需要使用延遲任務。

    延遲任務實現思路分析

    延遲任務實現的關鍵是在某個時間節點執行某個任務。基于這個信息我們可以想到實現延遲任務的手段有以下兩個:

  • 自己手寫一個“死循環”一直判斷當前時間節點有沒有要執行的任務;
  • 借助?JDK?或者第三方提供的工具類來實現延遲任務。
  • 而通過?JDK?實現延遲任務我們能想到的關鍵詞是:DelayQueue、ScheduledExecutorService,而第三方提供的延遲任務執行方法就有很多了,例如:Redis、Netty、MQ?等手段。

    延遲任務實現

    下面我們將結合代碼來講解每種延遲任務的具體實現。

    1.無限循環實現延遲任務

    此方式我們需要開啟一個無限循環一直掃描任務,然后使用一個?Map?集合用來存儲任務和延遲執行的時間,實現代碼如下:

    import java.time.Instant; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Iterator; import java.util.Map;/*** 延遲任務執行方法匯總*/ public class DelayTaskExample {// 存放定時任務private static Map<String, Long> _TaskMap = new HashMap<>();public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());// 添加定時任務_TaskMap.put("task-1", Instant.now().plusSeconds(3).toEpochMilli()); // 延遲 3s// 調用無限循環實現延遲任務loopTask();}/*** 無限循環實現延遲任務*/public static void loopTask() {Long itemLong = 0L;while (true) {Iterator it = _TaskMap.entrySet().iterator();while (it.hasNext()) {Map.Entry entry = (Map.Entry) it.next();itemLong = (Long) entry.getValue();// 有任務需要執行if (Instant.now().toEpochMilli() >= itemLong) {// 延遲任務,業務邏輯執行System.out.println("執行任務:" + entry.getKey() +" ,執行時間:" + LocalDateTime.now());// 刪除任務_TaskMap.remove(entry.getKey());}}}} }

    以上程序執行的結果為:

    程序啟動時間:2020-04-12T18:51:28.188

    執行任務:task-1 ,執行時間:2020-04-12T18:51:31.189

    可以看出任務延遲了 3s?鐘執行了,符合我們的預期。

    2.Java?API 實現延遲任務

    Java API?提供了兩種實現延遲任務的方法:DelayQueue?和?ScheduledExecutorService。

    ①?ScheduledExecutorService?實現延遲任務

    我們可以使用 ScheduledExecutorService?來以固定的頻率一直執行任務,實現代碼如下:

    public class DelayTaskExample {public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());scheduledExecutorServiceTask();}/*** ScheduledExecutorService 實現固定頻率一直循環執行任務*/public static void scheduledExecutorServiceTask() {ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {// 執行任務的業務代碼System.out.println("執行任務" +" ,執行時間:" + LocalDateTime.now());}},2, // 初次執行間隔2, // 2s 執行一次TimeUnit.SECONDS);} }

    以上程序執行的結果為:

    程序啟動時間:2020-04-12T21:28:10.416

    執行任務 ,執行時間:2020-04-12T21:28:12.421

    執行任務 ,執行時間:2020-04-12T21:28:14.422

    ......

    可以看出使用?ScheduledExecutorService#scheduleWithFixedDelay(...)?方法之后,會以某個頻率一直循環執行延遲任務。

    ②?DelayQueue?實現延遲任務

    DelayQueue 是一個支持延時獲取元素的無界阻塞隊列,隊列中的元素必須實現 Delayed 接口,并重寫?getDelay(TimeUnit) 和 compareTo(Delayed)?方法,DelayQueue?實現延遲隊列的完整代碼如下:

    public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();// 添加延遲任務delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){// 執行延遲任務System.out.println(delayQueue.take());}System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延遲截止時間(單面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 獲取剩余時間public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 隊列里元素的排序依據public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}} }

    以上程序執行的結果為:

    開始時間:2020-4-12 20:40:38

    2020-4-12 20:40:39

    2020-4-12 20:40:41

    2020-4-12 20:40:43

    結束時間:2020-4-12 20:40:43

    3.Redis?實現延遲任務

    使用?Redis?實現延遲任務的方法大體可分為兩類:通過?zset 數據判斷的方式,和通過鍵空間通知的方式

    ①?通過數據判斷的方式

    我們借助?zset?數據類型,把延遲任務存儲在此數據集合中,然后在開啟一個無線循環查詢當前時間的所有任務進行消費,實現代碼如下(需要借助?Jedis?框架):

    import redis.clients.jedis.Jedis; import utils.JedisUtils; import java.time.Instant; import java.util.Set;public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續添加測試數據jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue(jedis);}/*** 延遲隊列消費* @param jedis Redis 客戶端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 當前時間Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間long nowSecond = nowInstant.getEpochSecond();// 查詢當前時間的所有任務Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消費任務System.out.println("消費:" + item);}// 刪除已經執行的任務jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒輪詢一次}} }

    ②?通過鍵空間通知

    默認情況下?Redis?服務器端是不開啟鍵空間通知的,需要我們通過 config set notify-keyspace-events Ex?的命令手動開啟,開啟鍵空間通知后,我們就可以拿到每個鍵值過期的事件,我們利用這個機制實現了給每個人開啟一個定時任務的功能,實現代碼如下:

    import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; import utils.JedisUtils;public class TaskExample {public static final String _TOPIC = "__keyevent@0__:expired"; // 訂閱頻道名稱public static void main(String[] args) {Jedis jedis = JedisUtils.getJedis();// 執行定時任務doTask(jedis);}/*** 訂閱過期消息,執行定時任務* @param jedis Redis 客戶端*/public static void doTask(Jedis jedis) {// 訂閱過期消息jedis.psubscribe(new JedisPubSub() {@Overridepublic void onPMessage(String pattern, String channel, String message) {// 接收到消息,執行定時任務System.out.println("收到消息:" + message);}}, _TOPIC);} }

    4.Netty 實現延遲任務

    Netty 是由 JBOSS 提供的一個 Java 開源框架,它是一個基于 NIO 的客戶、服務器端的編程框架,使用 Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當于簡化和流線化了網絡應用的編程開發過程,例如:基于 TCP 和 UDP 的 socket 服務開發。

    可以使用?Netty?提供的工具類?HashedWheelTimer?來實現延遲任務,實現代碼如下。

    首先在項目中添加?Netty?引用,配置如下:

    <!-- https://mvnrepository.com/artifact/io.netty/netty-common --> <dependency><groupId>io.netty</groupId><artifactId>netty-common</artifactId><version>4.1.48.Final</version> </dependency>

    Netty?實現的完整代碼如下:

    public class DelayTaskExample {public static void main(String[] args) {System.out.println("程序啟動時間:" + LocalDateTime.now());NettyTask();}/*** 基于 Netty 的延遲任務*/private static void NettyTask() {// 創建延遲任務實例HashedWheelTimer timer = new HashedWheelTimer(3, // 時間間隔TimeUnit.SECONDS,100); // 時間輪中的槽數// 創建一個任務TimerTask task = new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("執行任務" +" ,執行時間:" + LocalDateTime.now());}};// 將任務添加到延遲隊列中timer.newTimeout(task, 0, TimeUnit.SECONDS);} }

    以上程序執行的結果為:

    程序啟動時間:2020-04-13T10:16:23.033

    執行任務 ,執行時間:2020-04-13T10:16:26.118

    HashedWheelTimer 是使用定時輪實現的,定時輪其實就是一種環型的數據結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個鏈表來保存要執行的超時任務,同時有一個指針一格一格的走,走到那個格子時就執行格子對應的延遲任務,如下圖所示: (圖片來源于網絡)

    以上的圖片可以理解為,時間輪大小為 8,某個時間轉一格(例如 1s),每格指向一個鏈表,保存著待執行的任務。

    5.MQ 實現延遲任務

    如果專門開啟一個?MQ?中間件來執行延遲任務,就有點殺雞用宰牛刀般的奢侈了,不過已經有了?MQ?環境的話,用它來實現延遲任務的話,還是可取的。

    幾乎所有的?MQ?中間件都可以實現延遲任務,在這里更準確的叫法應該叫延隊列。本文就使用?RabbitMQ?為例,來看它是如何實現延遲任務的。

    RabbitMQ?實現延遲隊列的方式有兩種:

    • 通過消息過期后進入死信交換器,再由交換器轉發到延遲消費隊列,實現延遲功能;
    • 使用 rabbitmq-delayed-message-exchange 插件實現延遲功能。

    注意: 延遲插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依賴 Erlang/OPT 18.0 及以上運行環境。

    由于使用死信交換器比較麻煩,所以推薦使用第二種實現方式 rabbitmq-delayed-message-exchange 插件的方式實現延遲隊列的功能。

    首先,我們需要下載并安裝?rabbitmq-delayed-message-exchange 插件,下載地址:http://www.rabbitmq.com/community-plugins.html

    選擇相應的對應的版本進行下載,然后拷貝到 RabbitMQ 服務器目錄,使用命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange?開啟插件,在使用命令 rabbitmq-plugins list?查詢安裝的所有插件,安裝成功如下圖所示:

    最后重啟 RabbitMQ 服務,使插件生效。

    首先,我們先要配置消息隊列,實現代碼如下:

    import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默認的交換機@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//參數二為類型:必須是x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 綁定隊列到交換器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();} }

    然后添加增加消息的代碼,具體實現如下:

    import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發送時間:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});} }

    再添加消費消息的代碼:

    import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:" + sdf.format(new Date()));System.out.println("消息內容:" + msg);} }

    最后,我們使用代碼測試一下:

    import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序執行之后,再退出測試} }

    以上程序的執行結果如下:

    發送時間:2020-04-13 20:47:51

    接收時間:2020-04-13 20:47:54

    消息內容:Hi Admin.

    從結果可以看出,以上程序執行符合延遲任務的實現預期。

    6.使用?Spring?定時任務

    如果你使用的是?Spring?或?SpringBoot?的項目的話,可以使用借助 Scheduled?來實現,本文將使用?SpringBoot?項目來演示?Scheduled?的實現,實現我們需要聲明開啟?Scheduled,實現代碼如下:

    @SpringBootApplication @EnableScheduling public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);} }

    然后添加延遲任務,實現代碼如下:

    @Component public class ScheduleJobs {@Scheduled(fixedDelay = 2 * 1000)public void fixedDelayJob() throws InterruptedException {System.out.println("任務執行,時間:" + LocalDateTime.now());} }

    此時當我們啟動項目之后就可以看到任務以延遲了 2s?的形式一直循環執行,結果如下:

    任務執行,時間:2020-04-13T14:07:53.349

    任務執行,時間:2020-04-13T14:07:55.350

    任務執行,時間:2020-04-13T14:07:57.351

    ...

    我們也可以使用 Corn 表達式來定義任務執行的頻率,例如使用 @Scheduled(cron = "0/4 * * * * ?")?。

    7.Quartz 實現延遲任務

    Quartz 是一款功能強大的任務調度器,可以實現較為復雜的調度功能,它還支持分布式的任務調度。

    我們使用?Quartz?來實現一個延遲任務,首先定義一個執行任務代碼如下:

    import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.scheduling.quartz.QuartzJobBean;import java.time.LocalDateTime;public class SampleJob extends QuartzJobBean {@Overrideprotected void executeInternal(JobExecutionContext jobExecutionContext)throws JobExecutionException {System.out.println("任務執行,時間:" + LocalDateTime.now());} }

    在定義一個?JobDetail 和 Trigger?實現代碼如下:

    import org.quartz.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class SampleScheduler {@Beanpublic JobDetail sampleJobDetail() {return JobBuilder.newJob(SampleJob.class).withIdentity("sampleJob").storeDurably().build();}@Beanpublic Trigger sampleJobTrigger() {// 3s 后執行SimpleScheduleBuilder scheduleBuilder =SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).withRepeatCount(1);return TriggerBuilder.newTrigger().forJob(sampleJobDetail()).withIdentity("sampleTrigger").withSchedule(scheduleBuilder).build();} }

    最后在?SpringBoot?項目啟動之后開啟延遲任務,實現代碼如下:

    import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.quartz.SchedulerFactoryBean;/*** SpringBoot 項目啟動后執行*/ public class MyStartupRunner implements CommandLineRunner {@Autowiredprivate SchedulerFactoryBean schedulerFactoryBean;@Autowiredprivate SampleScheduler sampleScheduler;@Overridepublic void run(String... args) throws Exception {// 啟動定時任務schedulerFactoryBean.getScheduler().scheduleJob(sampleScheduler.sampleJobTrigger());} }

    以上程序的執行結果如下:

    2020-04-13 19:02:12.331 ?INFO 17768 --- [ ?restartedMain] com.example.demo.DemoApplication ? ? ? ? : Started DemoApplication in 1.815 seconds (JVM running for 3.088)

    任務執行,時間:2020-04-13T19:02:15.019

    從結果可以看出在項目啟動 3s?之后執行了延遲任務。

    總結

    本文講了延遲任務的使用場景,以及延遲任務的 10 種實現方式:

  • 手動無線循環;
  • ScheduledExecutorService;
  • DelayQueue;
  • Redis zset 數據判斷的方式;
  • Redis 鍵空間通知的方式;
  • Netty 提供的 HashedWheelTimer 工具類;
  • RabbitMQ 死信隊列;
  • RabbitMQ 延遲消息插件 rabbitmq-delayed-message-exchange;
  • Spring Scheduled;
  • Quartz。
  • 最后的話

    俗話說:臺上一分鐘,臺下十年功。本文的所有內容皆為作者多年工作積累的結晶,以及熬夜嘔心瀝血的整理,如果覺得本文有幫助到你,請幫我分享出去,讓更多的人看到,謝謝你。

    更多精彩內容,請關注微信公眾號「Java中文社群」

    創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

    總結

    以上是生活随笔為你收集整理的史上最全的延迟任务实现方式汇总!附代码(强烈推荐)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。