日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

每日一博 - 延时任务的多种实现方式解读

發(fā)布時間:2025/3/21 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 每日一博 - 延时任务的多种实现方式解读 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • Pre
  • 延時任務(wù) VS 定時任務(wù)
  • Solutions
    • DB 輪詢
      • 核心思想
      • Demo Code
      • 優(yōu)缺點
    • JDK的Delay Queue
      • 核心思想
      • Demo Code
      • 優(yōu)缺點
    • 時間輪算法
      • 核心思想
      • Demo Code
      • 優(yōu)缺點
    • Redis緩存(zset)
      • 核心思想
      • Demo Code
    • Redis緩存(Keyspace Notifications)
      • 核心思想
    • Redisson 延時隊列
    • 使用消息隊列


Pre

每日一博 - 使用環(huán)形隊列實現(xiàn)高效的延時消息


延時任務(wù) VS 定時任務(wù)

舉個例子,開發(fā)中常見的延時任務(wù)場景:

  • 半小時未支付,取消訂單

延時任務(wù)和定時任務(wù)的幾個小區(qū)別,梳理下:

  • 定時任務(wù)有明確的觸發(fā)時間,延時任務(wù)沒有
  • 定時任務(wù)有執(zhí)行周期,而延時任務(wù)在某事件觸發(fā)后一段時間內(nèi)執(zhí)行,沒有執(zhí)行周期
  • 定時任務(wù)一般執(zhí)行的是批處理操作是多個任務(wù),而延時任務(wù)一般是單個任務(wù)

Solutions

DB 輪詢

核心思想

通過定時任務(wù)掃描,執(zhí)行業(yè)務(wù)邏輯。


Demo Code

參考實現(xiàn)如下:

<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.2</version></dependency> import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException;public class MyJob implements Job {public void execute(JobExecutionContext context)throws JobExecutionException {System.out.println("模擬掃描任務(wù)。。。。。");}public static void main(String[] args) throws Exception {// 創(chuàng)建任務(wù)JobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();// 創(chuàng)建觸發(fā)器 每3秒鐘執(zhí)行一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group3").withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();Scheduler scheduler = new StdSchedulerFactory().getScheduler();// 將任務(wù)及其觸發(fā)器放入調(diào)度器scheduler.scheduleJob(jobDetail, trigger);// 調(diào)度器開始調(diào)度任務(wù)scheduler.start();} }

優(yōu)缺點

優(yōu)點: 簡單 (好像也沒有其他的優(yōu)點了 哈哈哈 )

缺點:

  • (1)占用資源,對服務(wù)器內(nèi)存消耗大

  • (2)存在延遲,比如你每隔n分鐘掃描一次,那最壞的延遲時間就是n分鐘

  • (3)如果表的數(shù)據(jù)量較大,每隔幾分鐘這樣掃描一次,性能堪憂,DB壓力較大


JDK的Delay Queue

核心思想

利用JDK自帶的DelayQueue來實現(xiàn), 無界阻塞隊列,該隊列只有在延遲期滿的時候才能從中獲取元素,放入DelayQueue中的對象,必須實現(xiàn)Delayed接口。

  • poll():獲取并移除隊列的超時元素,沒有則返回空
  • take():獲取并移除隊列的超時元素,如果沒有則wait當(dāng)前線程,直到有元素滿足超時條件,返回結(jié)果。

Demo Code

import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:50* @mark: show me the code , change the world*/ public class TicketDelay implements Delayed {private String ticketId;private long timeout;public TicketDelay(String ticketId, long timeout) {this.ticketId= ticketId;this.timeout = timeout + System.nanoTime();}@Overridepublic int compareTo(Delayed other) {if (other == this) {return 0;}TicketDelay t = (TicketDelay) other;long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}/*** 返回距離自定義的超時時間還有多少* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(timeout - System.nanoTime(),TimeUnit.NANOSECONDS);}void doSomething() {System.out.println(ticketId+" is deleted");} } import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:51* @mark: show me the code , change the world*/ public class DelayQueueDemo {public static void main(String[] args) {// 模擬數(shù)據(jù)List<String> list = new ArrayList<>();list.add("Ticket-1");list.add("Ticket-2");list.add("Ticket-3");list.add("Ticket-4");list.add("Ticket-5");// 延時隊列DelayQueue<TicketDelay> queue = new DelayQueue<>();for (int i = 0; i < 5; i++) {long start = System.currentTimeMillis();//延遲2秒取出queue.put(new TicketDelay(list.get(i), TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS)));System.out.println("biubiubiu ~ " + (System.currentTimeMillis() - start) + " MilliSeconds ");try {queue.take().doSomething();System.out.println("biubiubiu " + (System.currentTimeMillis() - start) + " MilliSeconds 取到了數(shù)據(jù),開始后執(zhí)行業(yè)務(wù)操作");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("===========================\n" );}} }

優(yōu)缺點

優(yōu)點:

  • 效率高,任務(wù)觸發(fā)時間延遲低

缺點:

  • 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機

  • 集群擴展相當(dāng)麻煩

  • 因為內(nèi)存條件限制的原因,如數(shù)據(jù)太多,那么很容易就出現(xiàn)OOM異常

  • 代碼復(fù)雜度較高


時間輪算法

每日一博 - 使用環(huán)形隊列實現(xiàn)高效的延時消息

延時消息之時間輪


核心思想

其實本質(zhì)上它就是一個環(huán)形的數(shù)組,如圖所示,假設(shè)我們創(chuàng)建了一個長度為 8 的時間輪。

task0 = 當(dāng)我們需要新建一個 5s 延時消息,則只需要將它放到下標(biāo)為 5 的那個槽中。

task1 = 而如果是一個 10s 的延時消息,則需要將它放到下標(biāo)為 2 的槽中,但同時需要記錄它所對應(yīng)的圈數(shù),不然就和 2 秒的延時消息重復(fù)了。

task2= 當(dāng)創(chuàng)建一個 21s 的延時消息時,它所在的位置就和 task0 相同了,都在下標(biāo)為 5 的槽中,所以為了區(qū)別需要為他加上圈數(shù)為 2。

當(dāng)我們需要取出延時消息時,只需要每秒往下移動這個指針,然后取出該位置的所有任務(wù)即可。

當(dāng)然取出任務(wù)之前還得判斷圈數(shù)是否為 0 ,不為 0 時說明該任務(wù)還得再輪幾圈,同時需要將圈數(shù) -1 。

這樣就可避免輪詢所有的任務(wù),不過如果時間輪的槽比較少,導(dǎo)致某一個槽上的任務(wù)非常多那效率也比較低,這就和 HashMap 的 hash 沖突是一樣的。


時間輪算法可以類比于時鐘, (指針)按某一個方向按固定頻率輪動,每一次跳動稱為一個 tick。這樣可以看出定時輪由個3個重要的屬性參數(shù),ticksPerWheel(一輪的tick數(shù)),tickDuration(一個tick的持續(xù)時間)以及 timeUnit(時間單位)。

例如當(dāng)ticksPerWheel=60,tickDuration=1,timeUnit=秒,這就和現(xiàn)實中的始終的秒針走動完全類似了。

如果當(dāng)前指針指在1上面,我有一個任務(wù)需要4秒以后執(zhí)行,那么這個執(zhí)行的線程回調(diào)或者消息將會被放在5上。那如果需要在20秒之后執(zhí)行怎么辦,由于這個環(huán)形結(jié)構(gòu)槽數(shù)只到8,如果要20秒,指針需要多轉(zhuǎn)2圈。位置是在2圈之后的5上面(20 % 8 + 1)


Demo Code

我們用Netty的HashedWheelTimer來實現(xiàn)

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.24.Final</version></dependency> import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask;import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 23:30* @mark: show me the code , change the world*/ public class HashedWheelTimerTest {static class MyTimerTask implements TimerTask {boolean flag;public MyTimerTask(boolean flag) {this.flag = flag;}@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("要去執(zhí)行業(yè)務(wù)啦....");this.flag = false;}}public static void main(String[] argv) {MyTimerTask timerTask = new MyTimerTask(true);Timer timer = new HashedWheelTimer();timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);int i = 1;while (timerTask.flag) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(i + "秒過去了");i++;}} }

優(yōu)缺點

優(yōu)點

  • 效率高,任務(wù)觸發(fā)時間延遲時間比delayQueue低,代碼復(fù)雜度比delayQueue低。

缺點:

  • 服務(wù)器重啟后,數(shù)據(jù)全部消失,怕宕機

  • 集群擴展相當(dāng)麻煩

  • 因為內(nèi)存條件限制的原因,比如數(shù)據(jù)太多,那么很容易就出現(xiàn)OOM異常


Redis緩存(zset)

核心思想

利用redis的zset,zset是一個有序集合,每一個元素(member)都關(guān)聯(lián)了一個score,通過score排序來取集合中的值

# 添加單個元素redis> ZADD page_rank 10 google.com (integer) 1# 添加多個元素redis> ZADD page_rank 9 baidu.com 8 bing.com (integer) 2redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10"# 查詢元素的score值 redis> ZSCORE page_rank bing.com "8"# 移除單個元素redis> ZREM page_rank google.com (integer) 1redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"

那么如何實現(xiàn)呢?我們將訂單超時時間戳與訂單號分別設(shè)置為score和member,系統(tǒng)掃描第一個元素判斷是否超時

Demo Code

import java.util.Calendar; import java.util.Set;import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:04* @mark: show me the code , change the world*/ public class RedisDelayQueue {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedisPool = new JedisPool(ADDR, PORT);public static Jedis getJedis() {return jedisPool.getResource();}/*** 生產(chǎn)者,生成5個訂單放進去*/public void productionDelayMessage() {for (int i = 0; i < 5; i++) {//延遲3秒Calendar cal1 = Calendar.getInstance();cal1.add(Calendar.SECOND, 3);int second3later = (int) (cal1.getTimeInMillis() / 1000);RedisDelayQueue.getJedis().zadd("OrderId", second3later, "ARTISAN_ID_" + i);System.out.println(System.currentTimeMillis() + "ms:redis生成了訂單:訂單ID為" + "ARTISAN_ID_" + i);}}/*** 消費者,取訂單*/public void consumerDelayMessage() {Jedis jedis = RedisDelayQueue.getJedis();while (true) {Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);if (items == null || items.isEmpty()) {System.out.println("當(dāng)前沒有等待的任務(wù)");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}continue;}int score = (int) ((Tuple) items.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {String orderId = ((Tuple) items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis() + "ms:redis消費了一個任務(wù):消費的訂單Id為" + orderId);}}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();appTest.consumerDelayMessage();} }

上面的代碼有個硬傷:在高并發(fā)條件下,多消費者會取到同一個訂單號。

import java.util.concurrent.CountDownLatch;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:21* @mark: show me the code , change the world*/ public class MTest {private static final int threadNum = 10;private static CountDownLatch cdl = new CountDownLatch(threadNum);static class DelayMessage implements Runnable {@Overridepublic void run() {try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}RedisDelayQueue appTest = new RedisDelayQueue();appTest.consumerDelayMessage();}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();for (int i = 0; i < threadNum; i++) {new Thread(new DelayMessage()).start();cdl.countDown();}} }

顯然,出現(xiàn)了多個線程消費同一個資源的情況

解決方案

  • (1)用分布式鎖,但是用分布式鎖,性能下降了,不推薦

  • (2)對ZREM的返回值進行判斷,只有大于0的時候,才消費數(shù)據(jù),于是將consumerDelayMessage()方法里的

if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務(wù):消費的訂單OrderId為"+orderId);}

修改為

if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();Long num = jedis.zrem("OrderId", orderId);if( num != null && num>0){System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務(wù):消費的訂單OrderId為"+orderId);}}

Redis緩存(Keyspace Notifications)

核心思想

該方案使用redis的Keyspace Notifications,中文翻譯就是鍵空間機制,就是利用該機制可以在key失效之后,提供一個回調(diào),實際上是redis會給客戶端發(fā)送一個消息。 redis版本2.8以上。

在redis.conf中,加入一條配置

notify-keyspace-events Ex import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:38* @mark: show me the code , change the world*/ public class RedisTest {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedis = new JedisPool(ADDR, PORT);private static RedisSub sub = new RedisSub();public static void init() {new Thread(() -> jedis.getResource().subscribe(sub, "__keyevent@0__:expired")).start();}public static void main(String[] args) throws InterruptedException {init();for (int i = 0; i < 10; i++) {String orderId = "OID000000" + i;jedis.getResource().setex(orderId, 3, orderId);System.out.println(System.currentTimeMillis() + "ms:" + orderId + "訂單生成");}}static class RedisSub extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {System.out.println(System.currentTimeMillis() + "ms:" + message + "訂單取消");}} }

Redis的發(fā)布/訂閱目前是即發(fā)即棄(fire and forget)模式的,因此無法實現(xiàn)事件的可靠通知。也就是說,如果發(fā)布/訂閱的客戶端斷鏈之后又重連,則在客戶端斷鏈期間的所有事件都丟失了。
因此,Keyspace Notifications不是太推薦。當(dāng)然,如果你對可靠性要求不高,可以使用。


Redisson 延時隊列

使用消息隊列

總結(jié)

以上是生活随笔為你收集整理的每日一博 - 延时任务的多种实现方式解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。