日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

每日一博 - 延时任务的多种实现方式解读

發布時間:2025/3/21 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 每日一博 - 延时任务的多种实现方式解读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • Pre
  • 延時任務 VS 定時任務
  • Solutions
    • DB 輪詢
      • 核心思想
      • Demo Code
      • 優缺點
    • JDK的Delay Queue
      • 核心思想
      • Demo Code
      • 優缺點
    • 時間輪算法
      • 核心思想
      • Demo Code
      • 優缺點
    • Redis緩存(zset)
      • 核心思想
      • Demo Code
    • Redis緩存(Keyspace Notifications)
      • 核心思想
    • Redisson 延時隊列
    • 使用消息隊列


Pre

每日一博 - 使用環形隊列實現高效的延時消息


延時任務 VS 定時任務

舉個例子,開發中常見的延時任務場景:

  • 半小時未支付,取消訂單

延時任務和定時任務的幾個小區別,梳理下:

  • 定時任務有明確的觸發時間,延時任務沒有
  • 定時任務有執行周期,而延時任務在某事件觸發后一段時間內執行,沒有執行周期
  • 定時任務一般執行的是批處理操作是多個任務,而延時任務一般是單個任務

Solutions

DB 輪詢

核心思想

通過定時任務掃描,執行業務邏輯。


Demo Code

參考實現如下:

<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.2.2</version></dependency> import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException;public class MyJob implements Job {public void execute(JobExecutionContext context)throws JobExecutionException {System.out.println("模擬掃描任務。。。。。");}public static void main(String[] args) throws Exception {// 創建任務JobDetail jobDetail = JobBuilder.newJob(MyJob.class).withIdentity("job1", "group1").build();// 創建觸發器 每3秒鐘執行一次Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group3").withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();Scheduler scheduler = new StdSchedulerFactory().getScheduler();// 將任務及其觸發器放入調度器scheduler.scheduleJob(jobDetail, trigger);// 調度器開始調度任務scheduler.start();} }

優缺點

優點: 簡單 (好像也沒有其他的優點了 哈哈哈 )

缺點:

  • (1)占用資源,對服務器內存消耗大

  • (2)存在延遲,比如你每隔n分鐘掃描一次,那最壞的延遲時間就是n分鐘

  • (3)如果表的數據量較大,每隔幾分鐘這樣掃描一次,性能堪憂,DB壓力較大


JDK的Delay Queue

核心思想

利用JDK自帶的DelayQueue來實現, 無界阻塞隊列,該隊列只有在延遲期滿的時候才能從中獲取元素,放入DelayQueue中的對象,必須實現Delayed接口。

  • poll():獲取并移除隊列的超時元素,沒有則返回空
  • take():獲取并移除隊列的超時元素,如果沒有則wait當前線程,直到有元素滿足超時條件,返回結果。

Demo Code

import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:50* @mark: show me the code , change the world*/ public class TicketDelay implements Delayed {private String ticketId;private long timeout;public TicketDelay(String ticketId, long timeout) {this.ticketId= ticketId;this.timeout = timeout + System.nanoTime();}@Overridepublic int compareTo(Delayed other) {if (other == this) {return 0;}TicketDelay t = (TicketDelay) other;long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}/*** 返回距離自定義的超時時間還有多少* @param unit* @return*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(timeout - System.nanoTime(),TimeUnit.NANOSECONDS);}void doSomething() {System.out.println(ticketId+" is deleted");} } import java.util.ArrayList; import java.util.List; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 22:51* @mark: show me the code , change the world*/ public class DelayQueueDemo {public static void main(String[] args) {// 模擬數據List<String> list = new ArrayList<>();list.add("Ticket-1");list.add("Ticket-2");list.add("Ticket-3");list.add("Ticket-4");list.add("Ticket-5");// 延時隊列DelayQueue<TicketDelay> queue = new DelayQueue<>();for (int i = 0; i < 5; i++) {long start = System.currentTimeMillis();//延遲2秒取出queue.put(new TicketDelay(list.get(i), TimeUnit.NANOSECONDS.convert(2, TimeUnit.SECONDS)));System.out.println("biubiubiu ~ " + (System.currentTimeMillis() - start) + " MilliSeconds ");try {queue.take().doSomething();System.out.println("biubiubiu " + (System.currentTimeMillis() - start) + " MilliSeconds 取到了數據,開始后執行業務操作");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("===========================\n" );}} }

優缺點

優點:

  • 效率高,任務觸發時間延遲低

缺點:

  • 服務器重啟后,數據全部消失,怕宕機

  • 集群擴展相當麻煩

  • 因為內存條件限制的原因,如數據太多,那么很容易就出現OOM異常

  • 代碼復雜度較高


時間輪算法

每日一博 - 使用環形隊列實現高效的延時消息

延時消息之時間輪


核心思想

其實本質上它就是一個環形的數組,如圖所示,假設我們創建了一個長度為 8 的時間輪。

task0 = 當我們需要新建一個 5s 延時消息,則只需要將它放到下標為 5 的那個槽中。

task1 = 而如果是一個 10s 的延時消息,則需要將它放到下標為 2 的槽中,但同時需要記錄它所對應的圈數,不然就和 2 秒的延時消息重復了。

task2= 當創建一個 21s 的延時消息時,它所在的位置就和 task0 相同了,都在下標為 5 的槽中,所以為了區別需要為他加上圈數為 2。

當我們需要取出延時消息時,只需要每秒往下移動這個指針,然后取出該位置的所有任務即可。

當然取出任務之前還得判斷圈數是否為 0 ,不為 0 時說明該任務還得再輪幾圈,同時需要將圈數 -1 。

這樣就可避免輪詢所有的任務,不過如果時間輪的槽比較少,導致某一個槽上的任務非常多那效率也比較低,這就和 HashMap 的 hash 沖突是一樣的。


時間輪算法可以類比于時鐘, (指針)按某一個方向按固定頻率輪動,每一次跳動稱為一個 tick。這樣可以看出定時輪由個3個重要的屬性參數,ticksPerWheel(一輪的tick數),tickDuration(一個tick的持續時間)以及 timeUnit(時間單位)。

例如當ticksPerWheel=60,tickDuration=1,timeUnit=秒,這就和現實中的始終的秒針走動完全類似了。

如果當前指針指在1上面,我有一個任務需要4秒以后執行,那么這個執行的線程回調或者消息將會被放在5上。那如果需要在20秒之后執行怎么辦,由于這個環形結構槽數只到8,如果要20秒,指針需要多轉2圈。位置是在2圈之后的5上面(20 % 8 + 1)


Demo Code

我們用Netty的HashedWheelTimer來實現

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.24.Final</version></dependency> import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask;import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/2 23:30* @mark: show me the code , change the world*/ public class HashedWheelTimerTest {static class MyTimerTask implements TimerTask {boolean flag;public MyTimerTask(boolean flag) {this.flag = flag;}@Overridepublic void run(Timeout timeout) throws Exception {System.out.println("要去執行業務啦....");this.flag = false;}}public static void main(String[] argv) {MyTimerTask timerTask = new MyTimerTask(true);Timer timer = new HashedWheelTimer();timer.newTimeout(timerTask, 5, TimeUnit.SECONDS);int i = 1;while (timerTask.flag) {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(i + "秒過去了");i++;}} }

優缺點

優點

  • 效率高,任務觸發時間延遲時間比delayQueue低,代碼復雜度比delayQueue低。

缺點:

  • 服務器重啟后,數據全部消失,怕宕機

  • 集群擴展相當麻煩

  • 因為內存條件限制的原因,比如數據太多,那么很容易就出現OOM異常


Redis緩存(zset)

核心思想

利用redis的zset,zset是一個有序集合,每一個元素(member)都關聯了一個score,通過score排序來取集合中的值

# 添加單個元素redis> ZADD page_rank 10 google.com (integer) 1# 添加多個元素redis> ZADD page_rank 9 baidu.com 8 bing.com (integer) 2redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9" 5) "google.com" 6) "10"# 查詢元素的score值 redis> ZSCORE page_rank bing.com "8"# 移除單個元素redis> ZREM page_rank google.com (integer) 1redis> ZRANGE page_rank 0 -1 WITHSCORES 1) "bing.com" 2) "8" 3) "baidu.com" 4) "9"

那么如何實現呢?我們將訂單超時時間戳與訂單號分別設置為score和member,系統掃描第一個元素判斷是否超時

Demo Code

import java.util.Calendar; import java.util.Set;import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:04* @mark: show me the code , change the world*/ public class RedisDelayQueue {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedisPool = new JedisPool(ADDR, PORT);public static Jedis getJedis() {return jedisPool.getResource();}/*** 生產者,生成5個訂單放進去*/public void productionDelayMessage() {for (int i = 0; i < 5; i++) {//延遲3秒Calendar cal1 = Calendar.getInstance();cal1.add(Calendar.SECOND, 3);int second3later = (int) (cal1.getTimeInMillis() / 1000);RedisDelayQueue.getJedis().zadd("OrderId", second3later, "ARTISAN_ID_" + i);System.out.println(System.currentTimeMillis() + "ms:redis生成了訂單:訂單ID為" + "ARTISAN_ID_" + i);}}/*** 消費者,取訂單*/public void consumerDelayMessage() {Jedis jedis = RedisDelayQueue.getJedis();while (true) {Set<Tuple> items = jedis.zrangeWithScores("OrderId", 0, 1);if (items == null || items.isEmpty()) {System.out.println("當前沒有等待的任務");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}continue;}int score = (int) ((Tuple) items.toArray()[0]).getScore();Calendar cal = Calendar.getInstance();int nowSecond = (int) (cal.getTimeInMillis() / 1000);if (nowSecond >= score) {String orderId = ((Tuple) items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis() + "ms:redis消費了一個任務:消費的訂單Id為" + orderId);}}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();appTest.consumerDelayMessage();} }

上面的代碼有個硬傷:在高并發條件下,多消費者會取到同一個訂單號。

import java.util.concurrent.CountDownLatch;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:21* @mark: show me the code , change the world*/ public class MTest {private static final int threadNum = 10;private static CountDownLatch cdl = new CountDownLatch(threadNum);static class DelayMessage implements Runnable {@Overridepublic void run() {try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}RedisDelayQueue appTest = new RedisDelayQueue();appTest.consumerDelayMessage();}}public static void main(String[] args) {RedisDelayQueue appTest = new RedisDelayQueue();appTest.productionDelayMessage();for (int i = 0; i < threadNum; i++) {new Thread(new DelayMessage()).start();cdl.countDown();}} }

顯然,出現了多個線程消費同一個資源的情況

解決方案

  • (1)用分布式鎖,但是用分布式鎖,性能下降了,不推薦

  • (2)對ZREM的返回值進行判斷,只有大于0的時候,才消費數據,于是將consumerDelayMessage()方法里的

if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();jedis.zrem("OrderId", orderId);System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務:消費的訂單OrderId為"+orderId);}

修改為

if(nowSecond >= score){String orderId = ((Tuple)items.toArray()[0]).getElement();Long num = jedis.zrem("OrderId", orderId);if( num != null && num>0){System.out.println(System.currentTimeMillis()+"ms:redis消費了一個任務:消費的訂單OrderId為"+orderId);}}

Redis緩存(Keyspace Notifications)

核心思想

該方案使用redis的Keyspace Notifications,中文翻譯就是鍵空間機制,就是利用該機制可以在key失效之后,提供一個回調,實際上是redis會給客戶端發送一個消息。 redis版本2.8以上。

在redis.conf中,加入一條配置

notify-keyspace-events Ex import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/9/4 21:38* @mark: show me the code , change the world*/ public class RedisTest {private static final String ADDR = "127.0.0.1";private static final int PORT = 6379;private static JedisPool jedis = new JedisPool(ADDR, PORT);private static RedisSub sub = new RedisSub();public static void init() {new Thread(() -> jedis.getResource().subscribe(sub, "__keyevent@0__:expired")).start();}public static void main(String[] args) throws InterruptedException {init();for (int i = 0; i < 10; i++) {String orderId = "OID000000" + i;jedis.getResource().setex(orderId, 3, orderId);System.out.println(System.currentTimeMillis() + "ms:" + orderId + "訂單生成");}}static class RedisSub extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {System.out.println(System.currentTimeMillis() + "ms:" + message + "訂單取消");}} }

Redis的發布/訂閱目前是即發即棄(fire and forget)模式的,因此無法實現事件的可靠通知。也就是說,如果發布/訂閱的客戶端斷鏈之后又重連,則在客戶端斷鏈期間的所有事件都丟失了。
因此,Keyspace Notifications不是太推薦。當然,如果你對可靠性要求不高,可以使用。


Redisson 延時隊列

使用消息隊列

總結

以上是生活随笔為你收集整理的每日一博 - 延时任务的多种实现方式解读的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。