电商系统如何实现订单超时自动取消?
一、背景
系統中用戶下單,對于系統下單一般是分布式事務的操作,想要實現訂單超時自動取消,我們可以基于MQ的延遲隊列和死信隊列實現。整體的實現思路分三種情況要考慮,第一種是訂單的創建和投遞到MQ,第二種是正常訂單消息的消費,另外則是超時后消息的消費。
二、實現思路
對于訂單的創建,只要生產者將消息成功投遞到MQ,則認為訂單創建成功。MQ返回ack表明消息投遞成功,此時向延遲隊列發送一條消息,而延遲隊列掛載死信隊列。這樣做目的是:如果延遲隊列中的消息達到閾值還沒消費,則會進入死信隊列,此時死信隊列的監聽器則會獲取到過期的訂單信息,可以做取消操作,反之,則走正常訂單消費的流程。
整體實現思路大體如下:
三、具體代碼
本文基于RabbitMQ實現,借助于RabbitMQ的延遲隊列TTL和死信隊列。
配置文件:
增加RabbitMQ的配置類,創建對應的隊列、轉換器、監聽器以及隊列信息綁定,備注很詳細,這里就不太贅述。
*** RabbitMQ配置類*/ @Configuration public class RabbitMqConfig {/*** 使用DirectMessageListenerContainer,您需要確保ConnectionFactory配置了一個任務執行器,* 該執行器在使用該ConnectionFactory的所有偵聽器容器中具有足夠的線程來支持所需的并發性。* 默認連接池大小僅為5。** 并發性基于配置的隊列和consumersPerQueue。每個隊列的每個使用者使用一個單獨的通道,* 并發性由rabbit客戶端庫控制;默認情況下,它使用5個線程池;* 可以配置taskExecutor來提供所需的最大并發性。** @param connectionFactory* @return*/@Bean(name = "rabbitMessageListenerContainer")public DirectMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory){// 寫的時候,默認使用DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2個線程DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);// 設置確認消息的模式container.setAcknowledgeMode(AcknowledgeMode.MANUAL);container.setPrefetchCount(5);container.setConsumersPerQueue(5);container.setMessagesPerAck(1);ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(10);taskExecutor.setMaxPoolSize(20);//設置該屬性,靈活設置并發 ,多線程運行。container.setTaskExecutor(taskExecutor);return container;}/*** 設置消息轉換器,用于將對象轉換成JSON數據* 可以通過converterAndSend將對象發送消息隊列* 監聽器也可以通過該工具將接受對象反序列化成java對象** @return Jackson轉換器*/@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}/*** 訂單消息隊列* @return*/@Beanpublic Queue orderQueue(){return QueueBuilder.durable("q.order").build();}/*** 延遲消息隊列* @return*/@Beanpublic Queue ttlQueue(){Map<String,Object> args = new HashMap<>();// 該隊列的消息10s到期args.put("x-message-ttl", 10000);// 設置死信隊列交換器,(當隊列消息TTL到期后依然沒有消費,則加入死信隊列)args.put("x-dead-letter-exchange","x.dlx");// 設置私信隊列路由鍵,設置該隊列所關聯的死信交換器的routingKey,如果沒有特殊指定,使用原隊列的routingKeyargs.put("x-dead-letter-routing-key","k.dlx");Queue queue = new Queue("q.ttl",true,false,false, args);return queue;}/*** 死信隊列,用于取消用戶訂單* 當10s還沒有付款的訂單則進入死信隊列,消費死信隊列,取消用戶訂單** @return*/@Beanpublic Queue dlxQueue(){Map<String,Object> args = new HashMap<>();Queue dlq = new Queue("q.dlx",true,false,false, args);return dlq;}/*** 訂單交換器* @return*/@Beanpublic Exchange orderExchange(){Map<String, Object> args = new HashMap<>();DirectExchange exchange = new DirectExchange("x.order", true, false, args);return exchange;}/*** 延遲隊列交換器* @return*/@Beanpublic Exchange ttlExchange(){Map<String, Object> args = new HashMap<>();return new DirectExchange("x.ttl", true, false, args);}/*** 死信隊列交換器* @return*/@Beanpublic Exchange dlxExchange(){Map<String, Object> args = new HashMap<>();DirectExchange exchange = new DirectExchange("x.dlx", true, false, args);return exchange;}/*** 用于發送下單,做分布式事務的MQ* @return*/@Beanpublic Binding orderBinding(){return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("k.order").noargs();}/*** 用于等待用戶支付的延遲隊列綁定* @return*/@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("k.ttl").noargs();}/*** 用于支付超時取消用戶訂單的死信隊列綁定* @return*/@Beanpublic Binding dlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("k.dlx").noargs();}}創建訂單監聽器,用于監聽訂單正常的支付提交和超時取消。
/*** 訂單正常支付流程監聽*/ @Component public class OrderNormalListener {@RabbitListener(queues = "q.order",ackMode = "MANUAL")public void onMessage(Order order , Channel channel , Message message) throws IOException {System.out.println("寫入數據庫");System.out.println(order);for (OrderDetail detail : order.getDetails()){System.out.println(detail);}channel.basicAck(message.getMessageProperties().getDeliveryTag() , false);}}創建訂單超時自動取消監聽器,監聽的是死信隊列。
/*** 訂單超時自動取消監聽*/ @Component public class OrderCancelListener implements ChannelAwareMessageListener {@Override@RabbitListener(queues = "q.dlx" , ackMode = "MANUAL")public void onMessage(Message message, Channel channel) throws Exception {String orderId = new String(message.getBody());System.out.println("取消訂單:" + orderId);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }對于訂單提交,正常提交后同時投遞多一份到延遲隊列里面去,用作延遲取消。
// 構建訂單信息 Order order = new Order(); order.setUserId(IdUtils.generateUserId()); order.setOrderId(IdUtils.generateOrderId()); // 設置狀態為待支付 order.setStatus(OrderStatus.TO_BE_PAYED.toString()); order.setDetails(details);// 投遞消息 CorrelationData correlationData = new CorrelationData(); rabbitTemplate.convertAndSend("x.order","k.order", order, correlationData); // 同步等待,可以設置為異步回調 CorrelationData.Confirm confirm = correlationData.getFuture().get(); // 判斷發送的消息是否得到broker的確認 boolean confirmAck = confirm.isAck(); if (confirmAck){// 發送延遲等待消息rabbitTemplate.convertAndSend("x.ttl","k.ttl" , order.getOrderId()); }四、總結
到這里,基本就實現了整個訂單延遲自動取消的思路,但事實上還有問題。
投遞訂單消息到MQ后要投遞多一份到延遲隊列,可能存在第一次投遞成功但投遞到延遲隊列失敗的情況,這里則需要依賴分布式鎖或者增加補償機制;還有編碼上的問題,MQ隊列名稱這些最好抽離出來,當然這里只是demo,就沒有那么規范,如果是產品開發,這些都需要最好規定,方便后期維護。
總結
以上是生活随笔為你收集整理的电商系统如何实现订单超时自动取消?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Siemens M65...
- 下一篇: ICESat2学习笔记4 :Window