日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

rabbitmq 查询版本_基于rabbitmq解决分布式事务

發布時間:2023/11/27 生活经验 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq 查询版本_基于rabbitmq解决分布式事务 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

分布式事務要解決的問題是保證二個數據庫數據的一致性,本地事務ACID屬于剛性事務,基于CAP理論,分布式事務的核心要點柔性事務,最終一致性。

基于rabbitmq解決分布式事務要點如下

  • 生產者采用發送方確認機制加上mandatory參數或者備份交換機,保證消息被正確地到達隊列中。
  • 隊列、交換機、消息都需要持久化(可以考慮鏡像隊列機制,如果業務不是那么重要,比如短信郵件通知)。
  • 消費端采用手動應答的方式,確認消息已經被正確消費。

rabbitmq配置

spring:application:name: rabbitmq-servicerabbitmq:addresses: 192.168.137.128:5672,192.168.137.129:5672username: rootpassword: rootvirtual-host: transactionpublisher-confirms: true #開啟發送端確認機制publisher-returns: true # 開啟returnstemplate:mandatory: true # 交換機沒有匹配的隊列時,會將消息返回給生產者,避免消息丟失listener:simple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 3000server:port: 8400eureka:client:serviceUrl:defaultZone: http://localhost:8100/eureka/instance:prefer-ip-address: trueinstance-id: ${spring.cloud.client.ip-address}:${server.port}mq:common:exchange: common.exchangequeue: common.queueroutingkey: common.routing

代碼案例是下單減庫存

生產者代碼

package com.liwen.mqservice.producer;import com.liwen.entity.Order;
import com.liwen.feign.IOrderServiceFeign;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MQOrderProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Value("${mq.common.exchange}")private String exchangeName;@Value("${mq.common.routingkey}")private String routingKey;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate IOrderServiceFeign orderServiceFeign;public void send(JSONObject json){rabbitTemplate.setConfirmCallback(this);//發送到確認機制rabbitTemplate.setReturnCallback(this);//消息Return回調Order order= (Order) JSONObject.toBean(json.getJSONObject("order"), Order.class);orderServiceFeign.saveOrder(order);String orderId = order.getOrderId();String goodId = order.getGoodId();json.put("orderId", orderId);json.put("goodId", goodId);CorrelationData correlationData = new CorrelationData(orderId);rabbitTemplate.convertAndSend(exchangeName,routingKey, json.toString(), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {MessageProperties messageProperties = message.getMessageProperties();messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//設置消息持久化return message;}}, correlationData);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){/**   處理消息沒有到達交換機,數據丟失的情況*   根據訂單號查詢到訂單數據,并將數據保存到異常消息表中,定時補發,并報警人工處理* */String orderId = correlationData.getId();}else{//查詢訂單號是否在異常消息表,在的話要刪除log.info(">>>下單消息發送成功{}<<<",correlationData);}}@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {//消息到達交換機,沒有路由到隊列,根據訂單號查詢到訂單數據,并將數據保存到異常消息表中,定時補發,并報警人工處理/**  1 交換機沒有綁定隊列*  2 交換機根據路由鍵沒有匹配到隊列*  3 隊列消息已滿* */byte[] body = message.getBody();JSONObject json = JSONObject.fromObject(new String(body));System.out.println("return============================");System.out.println(message);}
}

對應生產者發送消息到rabbitmq有以下幾種情況:

1 沒有發送到交換機的數據,會回調public void confirm(CorrelationData correlationData, boolean ack, String cause) 方法(ack為false),我們可以在發送消息時把業務參數比如訂單號設置到correlationData參數中,回調時把相關消息保存到異常消息表,采用定時任務兜底,并報警通知相關人員。

2 對于發送方確認機制,只能保證消息到達rabbitmq的交換機(ack為true),如果此交換機沒有匹配的隊 列,那么消息會丟失,所以需要結合mandatory(設置為true),當交換機沒有匹配的隊列時,會回調 public void returnedMessage(Message message, int i, String s, String s1, String s2)將消息返回給生產者,我們保存消息body部分,采用定時任務兜底,并報警通知相關人員。

2 交換機、隊列、消息(重要的數據比如支付數據)需要持久化,避免消息在rabbitmq重啟、宕機等異常情況下造成消息丟失。

3 對應消費者,需要手動應答,明確告訴rabbitmq服務器,已經正確消費了消息,然后rabbitmq服務器會刪除已經被正確消費的消息。如果rabbitmq沒有收到應答消息(比如消費者處理超時或者網絡不好),rabbitmq會間隔的講消息重新發給消費者消費,這是消費者需要根據消息id或者全局唯一業務字段做好冪等處理。如果是消費者代碼有問題需要重新發布版本解決。兜底方案依然是保存消息到異常消息表,定時處理并報警通知相關人員處理。

總結:基于rabbitmq解決分布式事務,核心是最終一致性,比如電商下單減庫存、外賣下單派單等場景,核心是最終一致性。經過一定的時間(具體能容忍多久看業務場景)二個數據庫中的數據最終達到一致。

完結,不正確之處,望大佬指正!求點贊鼓勵~

總結

以上是生活随笔為你收集整理的rabbitmq 查询版本_基于rabbitmq解决分布式事务的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。