rabbitmq 查询版本_基于rabbitmq解决分布式事务
分布式事務要解決的問題是保證二個數據庫數據的一致性,本地事務ACID屬于剛性事務,基于CAP理論,分布式事務的核心要點柔性事務,最終一致性。
基于rabbitmq解決分布式事務要點如下
- 生產者采用發送方確認機制加上mandatory參數或者備份交換機,保證消息被正確地到達隊列中。
- 隊列、交換機、消息都需要持久化(可以考慮鏡像隊列機制,如果業務不是那么重要,比如短信郵件通知)。
- 消費端采用手動應答的方式,確認消息已經被正確消費。
rabbitmq配置
spring:application:name: rabbitmq-servicerabbitmq:addresses: 192.168.137.128:5672,192.168.137.129:5672username: rootpassword: rootvirtual-host: transactionpublisher-confirms: true #開啟發送端確認機制publisher-returns: true # 開啟returnstemplate:mandatory: true # 交換機沒有匹配的隊列時,會將消息返回給生產者,避免消息丟失listener:simple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 3000server:port: 8400eureka:client:serviceUrl:defaultZone: http://localhost:8100/eureka/instance:prefer-ip-address: trueinstance-id: ${spring.cloud.client.ip-address}:${server.port}mq:common:exchange: common.exchangequeue: common.queueroutingkey: common.routing
代碼案例是下單減庫存
生產者代碼
package com.liwen.mqservice.producer;import com.liwen.entity.Order;
import com.liwen.feign.IOrderServiceFeign;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQOrderProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Value("${mq.common.exchange}")private String exchangeName;@Value("${mq.common.routingkey}")private String routingKey;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate IOrderServiceFeign orderServiceFeign;public void send(JSONObject json){rabbitTemplate.setConfirmCallback(this);//發送到確認機制rabbitTemplate.setReturnCallback(this);//消息Return回調Order order= (Order) JSONObject.toBean(json.getJSONObject("order"), Order.class);orderServiceFeign.saveOrder(order);String orderId = order.getOrderId();String goodId = order.getGoodId();json.put("orderId", orderId);json.put("goodId", goodId);CorrelationData correlationData = new CorrelationData(orderId);rabbitTemplate.convertAndSend(exchangeName,routingKey, json.toString(), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設置消息持久化return message;}}, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){/** 處理消息沒有到達交換機,數據丟失的情況* 根據訂單號查詢到訂單數據,并將數據保存到異常消息表中,定時補發,并報警人工處理* */String orderId = correlationData.getId();}else{//查詢訂單號是否在異常消息表,在的話要刪除log.info(">>>下單消息發送成功{}<<<",correlationData);}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {//消息到達交換機,沒有路由到隊列,根據訂單號查詢到訂單數據,并將數據保存到異常消息表中,定時補發,并報警人工處理/** 1 交換機沒有綁定隊列* 2 交換機根據路由鍵沒有匹配到隊列* 3 隊列消息已滿* */byte[] body = message.getBody();JSONObject json = JSONObject.fromObject(new String(body));System.out.println("return============================");System.out.println(message);}
}對應生產者發送消息到rabbitmq有以下幾種情況:
1 沒有發送到交換機的數據,會回調public void confirm(CorrelationData correlationData, boolean ack, String cause) 方法(ack為false),我們可以在發送消息時把業務參數比如訂單號設置到correlationData參數中,回調時把相關消息保存到異常消息表,采用定時任務兜底,并報警通知相關人員。
2 對于發送方確認機制,只能保證消息到達rabbitmq的交換機(ack為true),如果此交換機沒有匹配的隊 列,那么消息會丟失,所以需要結合mandatory(設置為true),當交換機沒有匹配的隊列時,會回調 public void returnedMessage(Message message, int i, String s, String s1, String s2)將消息返回給生產者,我們保存消息body部分,采用定時任務兜底,并報警通知相關人員。
2 交換機、隊列、消息(重要的數據比如支付數據)需要持久化,避免消息在rabbitmq重啟、宕機等異常情況下造成消息丟失。
3 對應消費者,需要手動應答,明確告訴rabbitmq服務器,已經正確消費了消息,然后rabbitmq服務器會刪除已經被正確消費的消息。如果rabbitmq沒有收到應答消息(比如消費者處理超時或者網絡不好),rabbitmq會間隔的講消息重新發給消費者消費,這是消費者需要根據消息id或者全局唯一業務字段做好冪等處理。如果是消費者代碼有問題需要重新發布版本解決。兜底方案依然是保存消息到異常消息表,定時處理并報警通知相關人員處理。
總結:基于rabbitmq解決分布式事務,核心是最終一致性,比如電商下單減庫存、外賣下單派單等場景,核心是最終一致性。經過一定的時間(具體能容忍多久看業務場景)二個數據庫中的數據最終達到一致。
完結,不正確之處,望大佬指正!求點贊鼓勵~
總結
以上是生活随笔為你收集整理的rabbitmq 查询版本_基于rabbitmq解决分布式事务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电影下映后还会不会再次播出?
- 下一篇: java 定时器获得外部参数_JMete