日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java业务场景-实现订单超时关闭等延时队列操作的几种方式

發布時間:2023/12/16 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java业务场景-实现订单超时关闭等延时队列操作的几种方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ScheduledThreadPoolExecutor 線程池

鏈接: ScheduledThreadPoolExecutor.

import java.util.concurrent.Future;public class Entity {/*** 訂單到期時間*/private String orderExpirationTime;/*** 定時器Future*/private Future future;/*** @param orderExpirationTime 訂單到期時間* @param future 定時器*/public Entity(String orderExpirationTime, Future future) {this.orderExpirationTime = orderExpirationTime;this.future = future;}/*** 獲取值*/public String getOrderExpirationTime() {return orderExpirationTime;}/*** 獲取Future對象*/public Future getFuture() {return future;} } import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateUtil; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory;import java.util.Date; import java.util.concurrent.*;/*** 取消訂單定時器緩存*/ public class CancelOrderTimer {/*** key為orderNumber*/private final static ConcurrentHashMap<String, Entity> map = new ConcurrentHashMap<>();/*** 線程池大小*/private static final int POOL_SIZE = 5;/*** 過期時間:一分鐘*/public static final int EXPIRE = 1;private final static ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(POOL_SIZE,new BasicThreadFactory.Builder().namingPattern("cancelOrder-schedule-pool-%d").daemon(true).build());public static ScheduledExecutorService getExecutor() {return executor;}/*** 讀取緩存*/public static String get(String key) {Entity entity = map.get(key);return entity == null ? null : entity.getOrderExpirationTime();}/*** 放入緩存*/public static void put(String orderNumber,Future future) {Date newDate = DateUtil.offset(DateUtil.date(), DateField.MINUTE, EXPIRE);String orderExpirationTime = DateUtil.formatDateTime(newDate);map.put(orderNumber, new Entity(orderExpirationTime, future));}/*** 清除緩存并取消定時任務* @param orderNumber 訂單號* @param mayInterruptIfRunning 是否中斷正在執行的任務*/public static void remove(String orderNumber, boolean mayInterruptIfRunning) {Entity entity = map.remove(orderNumber);if (entity == null) {return;}Future future = entity.getFuture();if (future != null) {// 注意:傳入true會中斷正在執行的任務future.cancel(mayInterruptIfRunning);}}/*** 獲取訂單剩余取消時間** @param orderNumber 訂單號*/public static String getOrderExpirationTime(String orderNumber) {long remainMinute = 0;long remainSecond = 0;String orderExpirationTime = get(orderNumber);if (StringUtils.isNotBlank(orderExpirationTime)) {long s = DateUtil.parse(orderExpirationTime).getTime();long between = (s - System.currentTimeMillis()) / 1000;remainMinute = between / 60 % 60;remainSecond = between % 60;}return "訂單過期還剩:" + remainMinute + "分鐘" + remainSecond + "秒";}} import cn.hutool.core.lang.Console; import cn.hutool.core.thread.ConcurrencyTester; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.IdUtil; import com.zm.demo.util.CancelOrderTimer; import com.zm.demo.util.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;@RestController @Validated public class DelayTestController {public static Logger log = LoggerFactory.getLogger(CancelOrderTimer.class);@Autowiredprivate RedisUtil RedisUtil;@GetMapping("/put_timer")public String hello1(@RequestParam String orderNumber){ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {// 測試的邏輯內容putTimer(IdUtil.simpleUUID());});// 獲取總的執行時間,單位毫秒Console.log(tester.getInterval());return "ok";}@GetMapping("/expiration_time")public String hello2(@RequestParam String orderNumber){return CancelOrderTimer.getOrderExpirationTime(orderNumber);}public void putTimer(String orderNumber) {CancelOrderTimer.remove(orderNumber,false);ScheduledExecutorService executor = CancelOrderTimer.getExecutor();Future future = executor.schedule(() -> {try {synchronized (CancelOrderTimer.class) {// 這里用 redis計錄 任務執行成功的次數RedisUtil.incrBy("one",1);log.info(orderNumber + "訂單號成功執行取消訂單");}} catch (Exception e) {e.printStackTrace();log.error(orderNumber + "訂單號執行取消訂單失敗");}}, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES);CancelOrderTimer.put(orderNumber,future);log.info( orderNumber + "訂單號啟動取消訂單定時器");} }

redis

鏈接: 超簡單使用redisson延遲隊列做定時任務.

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.15.3</version> </dependency> spring:redis:host: xxxxxxxport: 6379timeout: 5000password: xxxxxx import java.io.Serializable;public class TaskBodyDTO implements Serializable {private String orderNumber;public String getOrderNumber() {return orderNumber;}public void setOrderNumber(String orderNumber) {this.orderNumber = orderNumber;} } /*** 隊列事件監聽接口,需要實現這個方法** @param <T>*/ public interface RedisDelayedQueueListener<T> {/*** 執行方法** @param t*/void invoke(T t); } import com.zm.demo.util.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class TestListener1 implements RedisDelayedQueueListener<TaskBodyDTO>{public static Logger log = LoggerFactory.getLogger(TestListener1.class);@Autowiredprivate RedisUtil RedisUtil;@Overridepublic void invoke(TaskBodyDTO taskBodyDTO) {RedisUtil.incrBy("one",1);log.info( taskBodyDTO.getOrderNumber() + "訂單號成功執行取消訂單");}} import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.redisson.api.RBlockingQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component;import java.util.Map; import java.util.concurrent.*;/*** 初始化隊列監聽*/ @Component public class RedisDelayedQueueInit implements ApplicationContextAware {private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);@AutowiredRedissonClient redissonClient;/*** corePoolSize:核心線程數** maximumPoolSize:最大線程數** keepAliveTime + unit:線程回收時間** workQueue:任務較多時暫存到隊列** threadFactory:執行程序創建新線程時使用的工廠** handler:超出線程池容量以及隊列長度后拒絕任務的策略*/private final static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("judge-pool-%d").setUncaughtExceptionHandler((thread, throwable)->logger.error("ThreadPool {} got exception", thread,throwable)).build();// 創建線程池,使?有界阻塞隊列防?內存溢出private final static ExecutorService statsThreadPool = new ThreadPoolExecutor(5, 10,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(100), namedThreadFactory);/*** 獲取應用上下文并獲取相應的接口實現類并啟動對應的監聽線程* @param applicationContext* @throws BeansException*/@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {String listenerName = taskEventListenerEntry.getValue().getClass().getName();startThread(listenerName, taskEventListenerEntry.getValue());}}/*** 啟動線程獲取隊列*** @param queueName queueName* @param redisDelayedQueueListener 任務回調監聽* @param <T> 泛型* @return*/private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);//由于此線程需要常駐,可以新建線程,不用交給線程池管理Thread thread = new Thread(() -> {logger.info("啟動監聽隊列線程" + queueName);while (true) {try {T t = blockingFairQueue.take();statsThreadPool.execute(() -> {redisDelayedQueueListener.invoke(t);});} catch (Exception e) {logger.info("監聽隊列線程錯誤,", e);try {Thread.sleep(10000);} catch (InterruptedException ex) {}}}});thread.setName(queueName);thread.start();}} import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Component public class RedisDelayedQueue {@AutowiredRedissonClient redissonClient;private static Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);/*** 添加隊列** @param t DTO傳輸類* @param delay 時間數量* @param timeUnit 時間單位* @param <T> 泛型*/public <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);delayedQueue.offer(t, delay, timeUnit);}} @AutowiredRedisDelayedQueue redisDelayedQueue;@GetMapping("/redis/put_timer")public String hello(){ConcurrencyTester tester = ThreadUtil.concurrencyTest(100, () -> {TaskBodyDTO taskBody = new TaskBodyDTO();taskBody.setOrderNumber(IdUtil.simpleUUID());redisDelayedQueue.addQueue(taskBody, CancelOrderTimer.EXPIRE, TimeUnit.MINUTES, TestListener1.class.getName());});Console.log("總的執行時間:"+tester.getInterval());return "ok";}

消息中間件

RabbitMQ 實現延遲隊列

導入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>

綁定交換機和隊列的關系

import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class RabbitConfig {public static final String exchange_name = "fanout_order_exchange";public static final String dead_exchange_name = "dead_order_exchange";public static final String dead_rout_key = "dead_order";/*** 配置交換機*/@Beanpublic FanoutExchange fanoutOrderExchange() {return new FanoutExchange(exchange_name, true, false);}/*** 配置ttl隊列 存放訂單 設置1分鐘投入 死信隊列*/@Beanpublic Queue ttlQueue() {// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。//一般設置一下隊列的持久化就好,其余兩個就是默認falseMap<String,Object> args = new HashMap<>();// 1000/秒args.put("x-message-ttl",60000);args.put("x-dead-letter-exchange",dead_exchange_name);args.put("x-dead-letter-routing-key",dead_rout_key);return new Queue("cancel.fanout.queue", true, false ,false, args);}/*** 將隊列和交換機綁定*/@Beanpublic Binding bindingFanout() {return BindingBuilder.bind(ttlQueue()).to(fanoutOrderExchange());}/*** 配置死信交換機*/@Beanpublic DirectExchange deadExchange() {return new DirectExchange(dead_exchange_name, true, false);}/*** 配置死信隊列*/@Beanpublic Queue cancelQueue() {// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。//一般設置一下隊列的持久化就好,其余兩個就是默認falsereturn new Queue("cancel.direct.queue", true);}/*** 將隊列和交換機綁定* @return*/@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(cancelQueue()).to(deadExchange()).with(dead_rout_key);}} import cn.hutool.core.util.IdUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class OrderService {public static Logger log = LoggerFactory.getLogger(OrderService .class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void makeOrder() {String orderNumber = IdUtil.simpleUUID();// convertSendAndReceive(…):可以同步消費者。使用此方法,當確認了所有的消費者都接收成功之后,才觸發另一個convertSendAndReceive(…),也就是才會接收下一條消息。RPC調用方式。// convertAndSend(…):使用此方法,交換機會馬上把所有的信息都交給所有的消費者,消費者再自行處理,不會因為消費者處理慢而阻塞線程。rabbitTemplate.convertAndSend(RabbitConfig.exchange_name, "", orderNumber);log.info(orderNumber + "訂單號啟動取消訂單定時器");}} import com.zm.demo.util.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class CancelOrderService {public static Logger log = LoggerFactory.getLogger(CancelOrderService .class);@Autowiredprivate RedisUtil RedisUtil;/*** @RabbitListener 監聽隊列* @RabbitHandler 代表此方法是一個消息接收的方法;該不要有返回值*/@RabbitListener(queues = "cancel.direct.queue")@RabbitHandlerpublic void invoke(String message){RedisUtil.incrBy("one",1);log.info(message + "訂單號成功執行取消訂單");}} @AutowiredOrderService OrderService;@GetMapping("/mq/put_timer")public String mq(){ConcurrencyTester tester = ThreadUtil.concurrencyTest(1000, () -> {OrderService.makeOrder();});Console.log("總的執行時間:"+tester.getInterval());return "ok";}

使用死信隊列實現延時消息的缺點:

1) 如果統一用隊列來設置消息的TTL,當梯度非常多的情況下,比如1分鐘,2分鐘,5分鐘,10分鐘,20分鐘,30分鐘……需要創建很多交換機和隊列來路由消息。

2) 如果單獨設置消息的TTL,則可能會造成隊列中的消息阻塞——前一條消息沒有出隊(沒有被消費),后面的消息無法投遞。

3) 可能存在一定的時間誤差。

并發測試對比:服務器性能 1核2G

定時器線程池 100并發
添加任務:87ms
執行任務:8秒

redis 100并發
添加任務:698ms
執行任務:8秒

RabbitMQ 100并發
添加任務:6秒
執行任務:8秒

定時器線程池 1000并發
添加任務:202ms
執行任務:1分鐘30秒

redis 1000并發
添加任務:3秒
執行任務:1分鐘30秒

RabbitMQ 1000并發
添加任務:1分鐘多
執行任務:2分鐘多

RabbitMQ 為什么這么慢 代碼有問題還是什么原因 有待研究~

具體選擇看具體需求及場景

定時器線程池

優點:
使用簡單
支持停止任務
執行時間較為準時

缺點:
任務數量大時,占用大量內存
一旦宕機或者執行任務失敗,無法重新執行任務,需要寫補償機制
存在較小時間誤差

redis 延遲隊列

優點:
redis 執行效率快
支持分布式
消息持久化

缺點:
編碼實現稍微復雜
沒有確認消息可靠消費機制,需要寫補償機制
無法取消任務
存在較小時間誤差

RabbitMQ 延遲隊列

優點:
解耦
通過生產者可靠消息投遞和消費者可靠消息確認機制能確保任務穩定執行
消息持久化
支持分布式
能接收大量的消息

缺點:
編碼實現復雜
無法取消任務
存在時間誤差

總結

以上是生活随笔為你收集整理的java业务场景-实现订单超时关闭等延时队列操作的几种方式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。