日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

springCloud 初探

發(fā)布時間:2024/3/24 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 springCloud 初探 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

分布式系統(tǒng)理論

分布式系統(tǒng)是若干個獨立計算機的集合,這些計算機的集合,這些計算機對于用戶來說就像單個相關(guān)系統(tǒng)。分布式系統(tǒng)是由一組通過網(wǎng)絡(luò)進行通信,為了完成共同的任務(wù)而協(xié)調(diào)工作的計算機節(jié)點組成的系統(tǒng)。分布式系統(tǒng)的出現(xiàn)是為了用廉價的,普通的機器完成單個計算機無法完成的計算,存儲任務(wù)。其目的是:利用更多的機器,處理更多的數(shù)據(jù)。

RPC

定義: RPC是指遠程過程調(diào)用,是一種進程間的通信方式,它是一種技術(shù)的思想,而不是規(guī)范。它允許程序調(diào)用另一個地址空間的過程或函數(shù),而不是程序顯式編碼這個遠程調(diào)用的細節(jié)。即程序員無論是調(diào)用本地還是遠程的函數(shù),本質(zhì)上編寫的調(diào)用代碼基本相同。

RPC原理:

?

RPC的核心:通訊, 序列化(方便數(shù)據(jù)傳輸)。

序列化:數(shù)據(jù)傳輸需要轉(zhuǎn)換。

解決這些核心問題我們可以使用Doubb

springCloud技術(shù)概況

?springCloud技術(shù)分布圖:

springCloud升級:

springBoot和springCloud版本兼容查詢

https://spring.io/projects/spring-cloud#learn

springCloud 對應(yīng)依賴:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>pom</type><scope>import</scope> </dependency>

Eureka

?作用: Eureka能夠自動注冊并發(fā)現(xiàn)微服務(wù),然后對服務(wù)的狀態(tài),信息進行集中管理,這樣我們需要獲取其他服務(wù)的信息時,我們只需要向Eureka進行查詢就可以了。

這樣服務(wù)之間的強廣聯(lián)性就會被進一步減弱。

?對應(yīng)依賴:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId><version>3.1.3</version> </dependency>

配置application.yml文件

eureka:client:#由于我們是作為服務(wù)端角色,所以不需要獲取服務(wù)端,改為false, 默認為truefetch-registry: false#暫時不需要將自己注冊到eurekaregister-with-eureka: falseservice-url:defaultZone: http://localhost:8888/eureka

且要在啟動類中添加注解:?@EnableEurekaServer?

效果圖:

?接下來將各個服務(wù)作為客戶端。

作為客戶端的對應(yīng)依賴:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version> </dependency>

配置各個服務(wù)的application.yml,讓將服務(wù)地址指向eureka服務(wù)的地址,這樣才能實現(xiàn)注冊。?

客戶端application.yml配置:

eureka:client:service-url:defaultZone: http://localhost:8888/eurekaspring:application:name: ”對應(yīng)名字“

效果圖:

當(dāng)我們的服務(wù)啟動之后,每隔一段時間eureka會發(fā)送一次心跳包,這樣eureka就能檢測到我們的服務(wù)是否還在正常運行。

通過eureka來調(diào)用服務(wù)?

例子一:

舊代碼

public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate();DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}

這里其實就是通過詢問eureka對應(yīng)的服務(wù)名來獲取對應(yīng)的ip地址。?

?負載均衡的實現(xiàn)

? ? 同一個服務(wù)器實際上可以注冊很多個的, 但它們的端口是不同的,比如我們創(chuàng)建多個用戶查詢服務(wù),將原有的端口進行修改,由idea中設(shè)置啟動參數(shù)來決定,這樣就可以創(chuàng)建幾個同端口的相同服務(wù)了。?

效果圖中說明在用戶服務(wù)處有多個相同的服務(wù)。?

?當(dāng)我們要使用用戶服務(wù)時,如果有第一個用戶服務(wù)down掉的話,就會有另一個用戶服務(wù)來執(zhí)行對應(yīng)的操作,防止整哥微服務(wù)不可用,大大提高了安全性。

當(dāng)存在多個相同服務(wù)的時候就會通過對應(yīng)的負載均衡的策略使每個服務(wù)都被調(diào)用起來,從而實現(xiàn)負載均衡。

新的實現(xiàn)代碼

1.在condig包中創(chuàng)建一個BeanConfiguration.java。

@Configuration public class BeanConfiguration {@Bean//負載均衡@LoadBalancedpublic RestTemplate restTemplate(){return new RestTemplate();} }

2.在對應(yīng)的service層中的使用@Autowired進行自動注入使用,將原來的ip地址改為在eureka中對應(yīng)的服務(wù)名字。

@Resource private RestTemplate template;public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); //將原來的ip地址改為在eureka中對應(yīng)的服務(wù)名字DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}

注冊中心高可用

?為了防止eureka down掉,我們可以搭建eureka集群。

效果圖:

?搭建eureka集群步驟:

1.修改兩個eureka服務(wù)端的配置文件。

applicationn01.yml

server:port: 9999 eureka:instance:#由于不支持多個localhost的eureka的服務(wù)器,但是又只能在本地測試,所有就只能自定義主機名稱了hostname: eureka01client:#不需要獲取服務(wù)端fetch-registry: false#去掉register-with-eureka選項,讓eureka服務(wù)器自己注冊到其他的eureka服務(wù)器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務(wù)器地址,不用寫自己的defaultZone: http://eureka02:9999/ereka

application02.yml

server:port: 9999 eureka:instance:#由于不支持多個localhost的eureka的服務(wù)器,但是又只能在本地測試,所有就只能自定義主機名稱了hostname: eureka02client:#不需要獲取服務(wù)端fetch-registry: false#去掉register-with-eureka選項,讓eureka服務(wù)器自己注冊到其他的eureka服務(wù)器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務(wù)器地址,不用寫自己的defaultZone: http://eureka01:8888/ereka

2.啟動eureka集群

在因為是本地測試所以我們要修改本地的hosts。

eureka01:?

eureka02:?

3.把所有的微服務(wù)在eureka集群中都注冊一次

service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka

?在一個eureka down掉的時候,另一個eureka還會繼續(xù)工作,這時我們就可以對應(yīng)down 掉的eureka進行維修,這樣就實現(xiàn)了高可用。

OpenFeign

? ? OpenFeign和RestTemplate一樣,也是一種HTTP客戶端請求工具,但它使用起來更加便捷。

對應(yīng)依賴:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.3</version> </dependency>

在對應(yīng)的啟動類上加上 @EnableFeignClients?

配對應(yīng)的FeignClient服務(wù)接口?。(可以單獨創(chuàng)建一個包來存放這些服務(wù)接口)

服務(wù)接口格式:

@FeignClient("在eureka中配置的服務(wù)名字") public interface BookClient { //該接口里面的方法為你要調(diào)用的Controller中的方法,這些方法的路徑要寫全@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }

?要使用此接口時可以通過@Resource進行注入。(類似Dao層的mybatis,其通過@FeignClient將接口注入到spring中)

?調(diào)用此接口的Service層中的代碼就要修改為下:

public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); // DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}

OpenFeign服務(wù)降級

為對應(yīng)的client接口創(chuàng)建實現(xiàn)類。

@Component //注入到spring中,使得BookClient能調(diào)用到此實現(xiàn)類 public class BookFallBackClient implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} } @Component 注入到spring中,使得UserClient能調(diào)用到此實現(xiàn)類 public class UserFallBackClient implements UserClient{@Overridepublic DbUser queryById(Integer id) {return new DbUser();} }

?將此實現(xiàn)類添加到client的備選方案中。

?在application.xml配置熔斷支持

feign:circuitbreaker:enabled: true

?效果圖:

Hystrix??

??Hystrix服務(wù)熔斷

? ? 微服務(wù)之間是可以相互調(diào)用的。?

由于位于最底端的服務(wù)提供者發(fā)生了故障,那么此時會直接導(dǎo)致ABCD全線崩潰,就像雪崩一樣。

這種情況實際上是不可避免的,由于太多的因素,比如網(wǎng)絡(luò)故障,系統(tǒng)故障,硬件問題,會導(dǎo)致這種極端的情況發(fā)生,因此我們需要找到對應(yīng)的方法來解決次問題。

為了解決分布式系統(tǒng)的雪崩問題,springCloud提供了Hystrix熔斷組件,它就像我們家中的保險絲一樣,當(dāng)電流過載的時候直接熔斷掉,防止危險的進一步發(fā)生,從而保障家庭用電的安全,可以想象一下,如果整條鏈路上的服務(wù)已經(jīng)全線崩潰,這時還在不斷地有大量的請求到達,想要各個服務(wù)進行處理,肯定是會使得情況越來越糟。

熔斷機制是應(yīng)對雪崩效應(yīng)的一種微服務(wù)鏈路保護機制,當(dāng)檢測到鏈路的某個微服務(wù)不可用或者響應(yīng)的時間太長時,會進行服務(wù)降級,進而熔斷該節(jié)點微服務(wù)的調(diào)用,快速返回"錯誤"的響應(yīng)信息,當(dāng)檢測到該節(jié)點微服務(wù)響應(yīng)正常后恢復(fù)調(diào)用鏈路。

實際上,熔斷就是在降級的基礎(chǔ)上進一步形成的,也就是說,在一段時間內(nèi)多次調(diào)用失敗,那么就直接升級為熔斷。

?當(dāng)需要的服務(wù)正常啟動后,熔斷機制就會關(guān)閉了。

Hystrix服務(wù)降級

? ? 服務(wù)降級并不會直接返回錯誤信息,而是可以提供一個補救的措施,正常響應(yīng)給請求者,這樣相當(dāng)于服務(wù)依然可用,但是服務(wù)能力肯定是下降的。?

對應(yīng)依賴:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId><version>2.2.10.RELEASE</version> </dependency>

在啟動類上添加注解: @EnableHystrix。

?

?在對應(yīng)的Controller層中編寫編寫備選方案。

//備選方案,返回一個空的對象public ResponseEntity<UserBook> onError(Integer uid) {UserBook userBook = new UserBook();userBook.setUser(null);userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}

在對的方法上添加注解:@HystrixCommand(fallbackMethod = "備選方案名")

?效果圖:?

Gateway路由網(wǎng)關(guān)?

可能并不是所有的微服務(wù)都需要直接暴露給外部調(diào)用,這時我們就可以使用路由機制,添加一層防護,讓所有的請求全部通過路由來轉(zhuǎn)發(fā)到各個微服務(wù),并且轉(zhuǎn)發(fā)給多個相同微服務(wù)實例也可以實現(xiàn)負載均衡。

?部署網(wǎng)關(guān)

對應(yīng)依賴為下:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version></dependency>

第一個是網(wǎng)關(guān)的依賴,第二個是跟其他微服務(wù)一樣,需要注冊到eureka中才能生效,不要添加web依賴,使用的是WebFlux框架。

編寫配置文件

eureka:client:service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka spring:application:name: gateway server:port: 8500

?繼續(xù)在配置文件中配置路由功能。

spring:cloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負載均衡到微服務(wù),也可以使用Http正常轉(zhuǎn)發(fā)predicates: #路由規(guī)則 斷言什么請求會被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個路徑,一律都被路由到上面指定的服務(wù)

?在輸入路徑后,如果路徑符合斷言,就會將uri和Path進行拼接。

路由過濾器

路由過濾器支持以某中方式修改傳入的HTTP請求或傳出的HTTP響應(yīng),路由過濾器的范圍是某個過濾器,跟之前的斷言一樣

修改對應(yīng)的配置文件。

spring:application:name: gatewaycloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負載均衡到微服務(wù),也可以使用Http正常轉(zhuǎn)發(fā)predicates: #路由規(guī)則 斷言什么請求會被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個路徑,一律都被路由到上面指定的服務(wù)- id: bookServiceuri: lb://book-servicepredicates:- Path=/dbBook/**filters: #添加過濾器- AddRequestHeader=Test, HELLO WORLD!

?在對應(yīng)的Controller層中測試是否過濾成功。

效果圖:
?

設(shè)置全局過濾器

?例子:攔截沒有攜帶指定參數(shù)的請求。

在gateway項目中創(chuàng)建一個實現(xiàn)類。

@Component public class TestFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數(shù)System.out.println(request.getQueryParams());//判斷是否包含test參數(shù),且參數(shù)值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應(yīng)return exchange.getResponse().setComplete();}} }

次過濾器會作用于整個網(wǎng)關(guān)。

效果圖:

只要路徑中沒有攜帶test=1就會被攔截。

當(dāng)然過濾器會存在很多個的,所以我們手動指定過濾器之間的順序。可以通過實現(xiàn)Ordered接口,重寫getOredr方法來設(shè)置執(zhí)行的順序。

@Component public class TestFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數(shù)System.out.println(request.getQueryParams());//判斷是否包含test參數(shù),且參數(shù)值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應(yīng)return exchange.getResponse().setComplete();}}@Overridepublic int getOrder() {//返回的數(shù)字表示執(zhí)行的順序 return 0;} }

注意Oreder的值越小執(zhí)行的優(yōu)先級就越高,并且無論是配置文件里編寫的單個路由過濾器還是全局過濾器,都會受到Order的影響(單個路由過濾器的Order值按從上到下的順序從1開始遞增),最終是按照Order值決定哪個路由過濾器先執(zhí)行,當(dāng)Order值相同時,全局路由過濾器會優(yōu)先于單個路由過濾器執(zhí)行。?

微服務(wù)CAP原則

在一個分布式系統(tǒng)中存在 Consistency(一致性),Availabiity(可用性),Partition Tolerance(區(qū)分容錯性)三者是不可同時保證的,最多同時保證其中的兩個。

?一致性:在分布式系統(tǒng)中的所有數(shù)據(jù)備份,在同一時刻都是相同的值。(所有的節(jié)點無論何時訪問都能拿到最新的值)

可用性:系統(tǒng)中非故障節(jié)點收到的每個都必須得到響應(yīng)。(不如我們之前使用的服務(wù)降級和熔斷,其實就是一種維持可用性的措施,雖然服務(wù)器返回的是無意義的數(shù)據(jù),但不至于用戶的請求會被服務(wù)器忽略)

區(qū)分容錯性:一個分布式系統(tǒng)里面,節(jié)點之間組成的網(wǎng)絡(luò)本應(yīng)該是相互連通的,然而可能因為一些故障(比如網(wǎng)絡(luò)丟包等,這是很難避免的),使得一些節(jié)點之間不能連通,整個網(wǎng)絡(luò)分成了幾塊區(qū)域,數(shù)據(jù)就散落在這些不相連通的區(qū)域里面。(這樣就可能出現(xiàn)某些被分區(qū)節(jié)點存放的數(shù)據(jù)訪問失敗,我們需要來容忍這些不可靠的情況)

總的來說,數(shù)據(jù)存放的節(jié)點越多,分區(qū)容忍性就越高,都是要復(fù)制更新的次數(shù)就會越來越多,同時為了保證一致性,更新所有節(jié)點數(shù)據(jù)所需要的時間就越長,那么可用性就會降低。

所以存在三種方案:

??

springCloud Alibaba的使用??

? ? springCloud的缺點:

1.springCloud部分組件停止維護和更新了,給開發(fā)帶來了不便。

2.springCloud部分開發(fā)環(huán)境復(fù)雜,沒有完善的可視化界面,我們需要大量的二次開發(fā)和定制。

3.springCloud配置復(fù)雜,難上手,部分配置差別難以群分和合理應(yīng)用。

? ? springCloud Alibaba的優(yōu)點:阿里使用過的組件經(jīng)歷了考驗,性能強悍,設(shè)計合理,現(xiàn)在開源出來給大家使用成套的產(chǎn)品搭配完善的可視化界面給開發(fā)帶來運維帶來了極大的便利,搭建簡單,學(xué)習(xí)曲線低。

spring-cloud-alibaba對應(yīng)依賴:

<dependencyManagement><dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.7.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies> </dependencyManagement>

?Nacos

Nacos: 一個易于使用的動態(tài)服務(wù)發(fā)現(xiàn)、配置和服務(wù)管理平臺,用于構(gòu)建云原生應(yīng)用程序。

作用: 將其作為服務(wù)注冊中心。

?Nacos作為服務(wù)注冊中心對應(yīng)依賴:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>

在github中下載nacos并將其復(fù)制到項目中。

在startup.sh中設(shè)置nacos讓其在前臺運行。?

配置服務(wù)。?

直接使用nacos。

后臺啟動nacos:?bash nacos/bin/startup.sh。(以集群的方式)?

關(guān)閉nacos: bash nacos/bin/shutdown.sh。

后臺啟動naocs: bash nacos/bin/startup.sh -m standalone。(以單例的方式啟動)

nacos 的地址為: localhost:8084/nacos。

?導(dǎo)入對應(yīng)依賴。

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.1</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2021.0.1.0</version><type>pom</type><scope>import</scope></dependency>

在子項目中添加依賴。

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2021.1</version></dependency>

設(shè)置配置文件。

spring:cloud: #配置nacos注冊中心地址nacos:discovery:server-addr: lcoalhost:8848

效果圖:

?使用openFeign調(diào)用服務(wù)。

對應(yīng)依賴為下:

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId><version>3.1.1</version></dependency>

和原生的調(diào)用方式一樣。

臨時實例和非臨時實例的區(qū)別:
臨時實例:和eureka一樣,采用心跳機制向nacos發(fā)送請求保持在線的狀態(tài),一但心跳停止,就代表實例下線,不保留實例信息。

非臨時實例:由nacos主動進行聯(lián)系,如果連接失敗,那么不會刪除實例信息,而是將健康狀態(tài)設(shè)置為false,相當(dāng)于對某個實例狀態(tài)持續(xù)進行監(jiān)控。

?設(shè)置非臨時實例

設(shè)置配置文件。

cloud:nacos:discovery: # 修改為false 表示其是個非臨時文件ephemeral: false

?對應(yīng)的實例下線時會顏色會變?yōu)榧t色。

?集群分區(qū)

對應(yīng)依賴為下:

spring:cloud:nacos:discovery:#對應(yīng)的集群名cluster-name: name

效果圖

在默認情況下,集群間的調(diào)用方式采用的是輪番調(diào)用,使用為了實現(xiàn)就近原則,我們要對配置文件進行相應(yīng)配置。

spring:cloud:#將loadbalancer的nacos支持開啟,集成nacos負載均衡loadbalancer:nacos:enabled: true

?在同個集群中如果存在多個相同的服務(wù)時,就會根據(jù)權(quán)重來執(zhí)行對應(yīng)的服務(wù),我們可以在nacos頁面中進行設(shè)置。

也可以通過配置文件進行修改。

spring:cloud:nacos:discovery:#設(shè)置權(quán)重,默認為1weight: 2

?配置中心

我們可以通過配置來加載遠程配置,這樣我們可以遠端集中管理配置文件。

我們可以通過在nacos中,點擊新建配置項,進行配置。

點擊發(fā)布。

在項目中添加依賴。

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId><version>3.1.2</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>2021.1</version></dependency>

編寫bootstrap.yml文件。

spring:application:#去遠程倉庫中調(diào)用對應(yīng)名字的配置name: user-serviceprofiles:active: devcloud:nacos:config:file-extension: ymlserver-addr: localhost:8848

此時兩個yml文件都會被使用。

?在默認情況下,在nacos中修改yml文件,對應(yīng)的服務(wù)中的yml的信息并不會發(fā)生對應(yīng)的改變,雖然對其做了監(jiān)聽。

為了能夠保證在nacos中修改的yml文件立刻生效,我們需要添加注解@RefreshScope。

?效果圖:

原配置文件?

?修改后的配置文件

?結(jié)果

命名空間?

?新建命名空間。

在配置文件中配置所屬的命名空間。

spring:cloud:nacos:discovery:namespace: 對應(yīng)命名空間的id

命名空間不相同的服務(wù)是不能相互調(diào)用的。

指定分組

?修改配置文件。

spring:cloud:nacos:discovery:group: 對應(yīng)的組名

默認為?DEFAULT_GROUP。?

高可用

?在本地數(shù)據(jù)庫導(dǎo)入nacos中的sql文件。

?配置conf/application.properties文件。

?將cluster.conf.example重命名為cluster.conf,并對其內(nèi)容進行修改,輸入多個的nanos地址,這里使用內(nèi)網(wǎng)映射。

?因為要創(chuàng)建nacos集群,所以我們要創(chuàng)建多個nacos,通過復(fù)制修改好的nacos進行操作,這時我們只要再對每個nacos修改端口號就可以了。(要以集群的方式啟動,可能會啟動失敗,多啟動幾次)

效果圖:?

我們需要一個負載均衡器來管理這些nacos,這里我們使用Nginx。

在Mac上安裝Nginx。

brew install nginx

編輯Nginx。

nano /usr/local/etc/nginx/nginx.conf

編輯內(nèi)容:

# 添加我們剛創(chuàng)建好的nacos服務(wù)器 upstream nacos-server {#被代理的服務(wù)群,nacos-server服務(wù)群的名字server localhost:8801;server localhost:8802; }server {#監(jiān)聽的端口listen 80;#服務(wù)名server_name localhost;#類似過濾器,當(dāng)訪問到符合此要求的路徑時就會去訪問對應(yīng)的服務(wù)群location /nacos {proxy_pass http://nacos-server;} }

重啟Nginx。

brew services restart nginx

將各個微服務(wù)的nacos的注冊地址改為localhost:80/nacos。(其會實現(xiàn)負載均衡)

在云服務(wù)器上做反向代理的例子。

1.配置nacos的端口號和對應(yīng)的數(shù)據(jù)庫。

2.啟動倆個nacos。

3.在Nginx中配置反向代理。

集群效果:

在每個微服務(wù)中將nacos的地址改為云服務(wù)器上Nginx反向代理的地址。

??

最終效果圖:

?Sentinel流量防衛(wèi)兵

?Sentinel 可以做到熔斷和降級,可以取代Hystrix。

Sentinel具有以下功能:

  • 豐富的適用場景:哨兵在阿里巴巴被廣泛使用,在過去10年中,幾乎涵蓋了Double-11(11.11)購物節(jié)的所有核心場景,例如需要限制突發(fā)流量以滿足系統(tǒng)容量的“第二次殺戮”,消息峰值剪切和山谷填充,不可靠的下游服務(wù)的斷路,集群流量控制等。
  • 實時監(jiān)控:哨兵還提供實時監(jiān)控功能。您可以實時查看一臺機器的運行時信息,以及少于500個節(jié)點的集群的聚合運行時信息。
  • 廣泛的開源生態(tài)系統(tǒng):Sentinel提供與Spring Cloud、Dubbo和gRPC等常用框架和庫的開箱即用集成。您只需將適配器依賴項添加到您的服務(wù)中,即可輕松使用Sentinel。
  • Polyglot支持:Sentinel為Java、Go和C++提供了原生支持。
  • 各種SPI擴展:Sentinel提供易于使用的SPI擴展接口,允許您快速自定義邏輯,例如自定義規(guī)則管理、調(diào)整數(shù)據(jù)源等。

下載對應(yīng)的jar包。

sentinel V1.8.3

?將jar導(dǎo)入到項目,為其創(chuàng)建一個服務(wù)。

啟動項目。?

?添加對應(yīng)的依賴。

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.0.1.0</version></dependency>

對微服務(wù)中application.yml進行配置。(實際上Sentinel是本地在管理,但我們可以連接到監(jiān)控頁面,這樣可以圖形化操作了)

spring:cloud:sentinel: # 添加監(jiān)控頁面地址transport:dashboard: localhost:8858

為了提高可讀性和節(jié)省空間,Sentinel只監(jiān)視被調(diào)用過的微服務(wù)。所以為了讓sentinel監(jiān)視我們的微服務(wù),我們需要手動的調(diào)用一次微服務(wù)。

效果圖:

流量控制

我們的機器不可能無限的接受和處理客戶端的請求,如果不加以限制,當(dāng)發(fā)生高并發(fā)時,就會使得系統(tǒng)的資源很快的被耗盡。為了避免這種情況,我們可以添加流量控制,當(dāng)一段時間內(nèi)的流量達到一定的閥值的時候,新的請求將不會再進行處理,這樣不僅可以合理地應(yīng)對高并發(fā)的情況,同時也可以在一定的程度上保護服務(wù)器不受到外界的攻擊。

解決方案

?針對判斷是否超過流量的閥值的四種算法

1.漏桶算法

?2.令牌桶算法(有點像游戲里的能量條機制)

?3.固定時間窗口算法

4.滑動時間窗口算法

通過Sentinel進行設(shè)置

?閥值類型:QPS就是每秒中的請求數(shù)量,并發(fā)線程數(shù)是按服務(wù)當(dāng)時使用的線程數(shù)據(jù)進行統(tǒng)計的。

流控模式:當(dāng)達到閥值時,流控的對象,這里暫時只用直接。

流控效果:就是我們上面所說的三種方案。

流控模式的區(qū)別:

1.直接:只針對當(dāng)前接口。

2.關(guān)聯(lián):當(dāng)其他接口超過閥值時,會導(dǎo)致當(dāng)前接口被限流。(別的接口出現(xiàn)問題由當(dāng)前接口承擔(dān)責(zé)任)

3.鏈路:更細粒度的限流,能精確到具體的方法。

鏈路模式能夠更加精準(zhǔn)的進行流量控制,鏈路流控模式指的是,當(dāng)從指定接口過來的資源請求達到限流條件時,開啟限流,這里得先講解一下@SentinelResource的使用。

我們可以對某個方法進行限流控制,無論是誰在何處調(diào)用了它,這里需要使用到@SentinelResource,一但方法被標(biāo)注,那么就會進行監(jiān)控。

在調(diào)用的方法上添加注解@SentinelResource。

@SentinelResource("detail")@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}

修改配置文件。

spring:cloud:sentinel:#關(guān)閉Context收斂,這樣被監(jiān)控方法可以進行不同鏈路的單獨監(jiān)控web-context-unify: false

運行查看效果。

?對精確限流設(shè)置限流策略。

?設(shè)置閥值為1,然后進行連續(xù)訪問測試,發(fā)現(xiàn)精確限流的位置拋出了異常。

?那么這個鏈路選項實際上就是決定只限流從哪個方向來的調(diào)用,比如我們只對borrow2這個接口對queryByUid方法的調(diào)用進行限流,那么我們就可以為其制定鏈路。

?入口資源的設(shè)置表示:進行精確限流的路徑,如果設(shè)置了入口資源,那么其他路徑調(diào)用被精確限流的方法時則不會被限流。

系統(tǒng)保護規(guī)則

?檢測對應(yīng)的設(shè)備來進行限流。

限流和異常處理?

我們看到被限流之后返回的Sentinel默認的數(shù)據(jù),其實我們可以返回我們自己定義的數(shù)據(jù)。?

?這里我們先創(chuàng)建好被限流狀態(tài)下需要返回的內(nèi)容,自定義一個請求映射。

?在要返回自定義的微服務(wù)的Controller中添加自定義的錯誤頁面。

@RequestMapping("/blocked")public JSONObject blocked() {JSONObject json = new JSONObject();json.put("code", 403);json.put("success", false);json.put("message", "訪問頻率過快, 請稍后訪問!");return json;}

在配置文件中配置限流頁面返回信息。

spring:cloud:sentinel:block-page: /dbUser/blocked

效果圖:?

?對于方法級別的限流,當(dāng)某個方法被限流時,會直接在后臺拋出異常,那么這種情況可以通過Sentinel添加一個替代方案,這樣當(dāng)我們發(fā)現(xiàn)異常時會直接執(zhí)行我們的代替方案并返回。

在service層中添加代替方案。

@SentinelResource(value = "detail", blockHandler = "blocked")//限流之后代替返回的其他方案,這樣就不會使用默認的拋出異常的方式了@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}//代替方案public UserBook blocked(Integer uid, BlockException blockException) {UserBook userBook = new UserBook();userBook.setBook(Collections.emptyList());userBook.setUser(new DbUser());return userBook;}

blockHandler只會處理限流的異常,而不會處理方法體內(nèi)的其他代碼異常。

如果要處理限流以外的其他異常,我們可以通過其他參數(shù)進行處理。

@RequestMapping("/test")@SentinelResource(value = "test",fallback = "except", //fallback指定出現(xiàn)異常是的替代方案exceptionsToIgnore = IOException.class) //忽略注定的異常,也就是出現(xiàn)這些指定的異常時不回調(diào)用的替代方案public String test() {throw new RuntimeException("拋出異常");}public String except(Throwable t) {return t.getMessage();}

效果圖:?

?當(dāng)在@SentinelResource中同時存在 fallback和blockHandler時,在拋出限流異常范圍內(nèi)的異常的時候就先調(diào)用blockHandler中的替代方案,其他的時候就會調(diào)用fallback。(注意在兩個都打存在的時候,因為限流會在方法執(zhí)行前調(diào)用,所以在限流代替方案執(zhí)行完以后還會在執(zhí)行出現(xiàn)其異常時的代替方案)

熱點參數(shù)限流

我們可以對某一熱點進行精準(zhǔn)限流,比如在某一時刻,不同參數(shù)被攜帶訪問的頻率是不一樣的:

http://localhost:8082/test?a=10 訪問100次

http://localhost:8082/test?b=10 訪問0次

http://localhost:8082/test?c=10 訪問3次?

由于攜帶的參數(shù)a的請求比較多,我們就可以只對攜帶參數(shù)a的請求進行限流。?

創(chuàng)建一個新的測試請求映射。

@RequestMapping("/test")@SentinelResource("test") //注意這里需要添加@SentinelResource才可以,用戶資源名稱就使用這里定義的資源名稱public String findBorrow(@RequestParam(value = "a", required = false) String a,@RequestParam(value = "b", required = false) String b,@RequestParam(value = "c", required = false) String c) {return "請求成功!" + "a = " + a + " b = " + b + " c = " + c;}

在Senntinel中設(shè)置熱點配置。(我們對a進行了限流)

?效果圖:

?我們也可以對某個參數(shù)的特定值進行特定限流。(我們對a進行特定值限流)?

效果圖:

Sentinel的服務(wù)熔斷和降級

為了防止鏈路故障,我們能進行隔離,這里我們有兩種隔離方案。

1.線程池隔離

線程池隔離實際上就是對每個服務(wù)的遠程調(diào)用單獨開放線程池,比如服務(wù)A要調(diào)用服務(wù)B,那么只基于固定數(shù)量的線程池,這樣即使在短時間內(nèi)出現(xiàn)大量請求,由于沒有線程可以分配,所以就不會導(dǎo)致資源耗盡。?

?2.信號量隔離

信號量隔離是使用Semaphore類實現(xiàn)的,思想基本跟上面的相同,也是限定指定的線程數(shù)量能夠同時進行服務(wù)調(diào)用,但它相對于線程池開銷會更小一些,使用效果同樣優(yōu)秀,也支持超時等,Sentinel就是采用這個方案進行隔離的。?

說回我們的熔斷與降級,當(dāng)下游的服務(wù)因為某些原因變得不可用或響應(yīng)過慢時,上游為了保證自己整體的高可用性,不再繼續(xù)調(diào)用目標(biāo)服務(wù)而是快速返回或執(zhí)行自己的代替方案。?

?整個過程分為三個狀態(tài):

1.關(guān)閉:熔斷器不工作,所有的請求全部該干嘛就干嘛。

2.開啟:熔斷器工作,所有的請求一律降級。

3.半開:嘗試進行一下正常的流程,要是還是不行就繼續(xù)保持開啟的狀態(tài),否則關(guān)閉。

?在Sentinel設(shè)置熔斷規(guī)則

熔斷策略

1.慢調(diào)用比例:如果出現(xiàn)那種半天都處理不完的調(diào)用,有可能就是服務(wù)出現(xiàn)故障,這個選項是按照最大效應(yīng)時間(RT)進行判斷的,如果一次請求的處理時間超過了指定的RT,那么就會判斷為慢調(diào)用,在一個統(tǒng)計時長內(nèi),如果請求數(shù)目大于最小請求數(shù)目,并且被判斷定為慢調(diào)用的請求比例已經(jīng)超過閥值,將觸發(fā)熔斷,經(jīng)過熔斷時長之后,將會進入到半開狀態(tài)進行試探。(這里和Hystrix一致)

?測試代碼

@RequestMapping("/borrow2/{uid}")public String test2(@PathVariable("uid") Integer uid) throws InterruptedException {Thread.sleep(1000);return "hello World!";}

在在對應(yīng)的微服務(wù)中設(shè)置熔斷設(shè)置。

?效果圖:

?2.異常比例:與慢調(diào)用比例相似,不過這里判斷的是出現(xiàn)異常的次數(shù)。

測試代碼

@RequestMapping("/borrow3/{uid}")public String test3(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}

在Sentinel中配置熔斷配置。

效果圖:

3.異常數(shù):很異常比例好像,但有明確指出異常數(shù)量。

降級策略

我們需要在@SentinelResource中配置blockHandler參數(shù)。(這里跟之前方法限流的配置是一樣的,因為如果添加了@SentinelResource注解,那么這里就會進行方法級別細粒度的限制,和之前方法級別限流一樣,會在降級之后直接拋出異常,如果不添加則返回默認的限流頁面,blockHandler的目的就是處理這種Sentinel機制的異常,所以這里其實和之前的限流配置是一個道理,因此下面熔斷配置也應(yīng)該對value自定義名稱的資源進行配置,才能作用到此方法上)

測試代碼:

//降級測試@RequestMapping("/borrow4/{uid}")@SentinelResource(value = "findBorrow", blockHandler = "test")public ResponseEntity<UserBook> findBorrow(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}//代替方案public ResponseEntity<UserBook> test(Integer uid, BlockException e) {System.out.println(e.getClass());UserBook userBook = new UserBook();userBook.setUser(new DbUser());userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}

在Sentinel中設(shè)置熔斷規(guī)則。

?效果圖:

?拋出降級異常。

openFeign支持Sentinel

?前面我們使用Hystrix的時候,就可以直接對openFeign的每個接口調(diào)用單獨進行服務(wù)降級,而使用Sentinel,也可以的。

在配置文件中開啟支持。

feign:sentinel:enabled: true

?和之前的openfign整合eureka的服務(wù)降級配置相似。

BookClient接口:

@FeignClient(value = "book-service",fallback = BookClientImpl.class) public interface BookClient {@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }

BookClient替代方案:

@Component public class BookClientImpl implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} }

啟用代替方案效果圖:?

?Sentinel整合RestTemplate使用服務(wù)降級

在config進行配置。

@Configuration public class RestTemplateConfig {@Bean@LoadBalanced@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class,fallback = "對應(yīng)的降級方案", fallbackClass = ExceptionUtil.class)public RestTemplate restTemplate() {return new RestTemplate();} }

?Seata與分布式事務(wù)

Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。

事務(wù)特性

?分布式講解方案

1.XA分布式事務(wù)協(xié)議 -2PC (兩階段提交實現(xiàn))?

這里的PC實際上指的是Prepare和Commit,也就是說它分為兩個階段,一個是準(zhǔn)備一個是提交,整個過程的參與者一共有兩個角色,一個是事務(wù)的執(zhí)行者,一個是事務(wù)的協(xié)調(diào)者,實際上整個事務(wù)的運作需要毅力啊協(xié)調(diào)者來維持。

?在準(zhǔn)備和提交階段,會進行:

準(zhǔn)備階段:

? ? 一個分布式事務(wù)是由協(xié)調(diào)者來開啟的,首先協(xié)調(diào)者會向所有的事務(wù)執(zhí)行者發(fā)送事務(wù)內(nèi)容,等待所有的事務(wù)執(zhí)行者答復(fù)。

各個事務(wù)執(zhí)行者開始執(zhí)行事務(wù)操作,但不會進行提交,并將undo和redo信息記錄到事務(wù)日志中。

如果事務(wù)執(zhí)行者執(zhí)行事務(wù)成功,那么就告訴協(xié)調(diào)者成功Yes,否則告訴協(xié)調(diào)者失敗No,不能提交事務(wù)。

提交階段:

? ? 當(dāng)前有的執(zhí)行者都反饋完成之后,進入第二階段。

協(xié)調(diào)者會檢測各個執(zhí)行者的反饋內(nèi)容,如果所有的返回都是成功,那么就告訴所有的執(zhí)行者可以提交事務(wù)了,最后再釋放鎖的資源。

如果有至少一個執(zhí)行者返回失敗或超時面,那么就讓所有的執(zhí)行者都會回滾,分布式事務(wù)執(zhí)行失敗。

雖然這種方式看起來比簡單,但是存在以下幾個問題:

1.事務(wù)協(xié)調(diào)者是非常核心的角色,一旦出現(xiàn)問題,將導(dǎo)致整個分布式不能正常運行。

2.如果提交階段發(fā)生網(wǎng)絡(luò)問題,導(dǎo)致某事務(wù)執(zhí)行者沒有收到協(xié)調(diào)者發(fā)來的提交命令,將導(dǎo)致某些執(zhí)行者沒提交,這樣肯定是不行的。

2.XA分布式事務(wù)協(xié)議 -3PC(三階段提交實現(xiàn))

三階段提交是在二階段提交的基礎(chǔ)上的改進播版本,主要是加了超時記機制,同時在協(xié)調(diào)者和執(zhí)行者都引入了超時機制。

三個階段分別進行:

CanCommit階段:

? ? 協(xié)調(diào)者向執(zhí)行者發(fā)送CanCommit請求,詢問是否可以執(zhí)行事務(wù)提交操作,然后開始等待執(zhí)行者的響應(yīng)。

ProeCommit階段:

? ? 協(xié)調(diào)者根據(jù)執(zhí)行者的反應(yīng)情況來決定是否可以進入第二階段事務(wù)的PreCommit操作。

如果所有的執(zhí)行者都返回Yes,則協(xié)調(diào)者向所有的執(zhí)行者發(fā)送PreCommit請求,并進入Prepared階段,執(zhí)行者接收到請求后,會執(zhí)行事務(wù)操作,并將undo和redo信息記錄到事務(wù)日志中,如果成功執(zhí)行,則返回成功的響應(yīng)。

如果所有的執(zhí)行者至少有一個返回No,則協(xié)調(diào)者會向所有的執(zhí)行者發(fā)送abort請求,所有的執(zhí)行者在收到請求或超時一段時間沒有收到任何請求時,會直接中斷事務(wù)。

DoCommit階段:

? ? 該階段進行真正的事務(wù)提交。

? ? 協(xié)調(diào)者接收到所有執(zhí)行者發(fā)送的成功響應(yīng),那么它就從PreCommit狀態(tài)進入DOCommit狀態(tài),并向所有的執(zhí)行者發(fā)送doCommit請求,執(zhí)行者接收到doCommit請求之后,開始執(zhí)行事務(wù)提交,并在完成事務(wù)提交之后釋放所有的事務(wù)資源,并最后向協(xié)調(diào)者發(fā)送確認響應(yīng),協(xié)調(diào)者接收到所有執(zhí)行者的確認響應(yīng)之后,完成事務(wù)(如果因為網(wǎng)絡(luò)問題導(dǎo)致執(zhí)行者沒有接收到doCommit請求,執(zhí)行者會在超時之后直接提交事務(wù),雖然執(zhí)行者只是猜測協(xié)調(diào)者返回的是doCommit請求,但是因為前面的兩個流程都正常執(zhí)行,所以能夠在一定程度上認為本次事務(wù)是成功的,因此會直接提交)

? ? 協(xié)調(diào)者沒有接收到至少一個執(zhí)行者發(fā)送的成功響應(yīng)(可能是響應(yīng)超時),那么就會執(zhí)行中斷事務(wù),協(xié)調(diào)者會向所有的執(zhí)行者發(fā)送abort請求,執(zhí)行者接收到abort請求之后,利用其在PreCommit階段記錄的undo信息來執(zhí)行事務(wù)的回滾操作,并在完成回滾操作之后釋放所有的事務(wù)資源,執(zhí)行者完成事務(wù)回滾之后,向協(xié)調(diào)者發(fā)送確認信息,協(xié)調(diào)者接收到參與者反饋的確認信息之后,執(zhí)行事務(wù)的中斷。

第三階段的特點:

1.3PC在2PC的第一階段和第二階段中插入一個準(zhǔn)備階段,保證在最后提交階段之前各參與節(jié)點的狀態(tài)是一致的。

2.一旦參與者無法及時收到來自協(xié)調(diào)者的信息之后,會默認執(zhí)行Commit,這樣就不會因為協(xié)調(diào)者單方面故障導(dǎo)致全局出現(xiàn)問題。

3.但是我們知道,實際上超時之后的Commit決策本質(zhì)上就是一個賭注罷了,如果此時協(xié)調(diào)者發(fā)送的是abort請求但是超時未接收,那么就會直接導(dǎo)致數(shù)據(jù)一致性的問題。?

3.TCC (補償事務(wù))

補償事務(wù)TCC就是Try,Comfirm,Cancel,它對業(yè)務(wù)有入侵性,一共分為三個階段。

Try階段:

? ? 比如我們需要借書時,將書籍的庫存-1,并將用戶的借閱量-1,但是這個操作,除了直接對庫存和借閱量進行修改之外,還需要將減去的值,單獨存放到凍結(jié)表中,但是此時不會創(chuàng)建借閱信息,也就是說只是預(yù)先把關(guān)鍵的東西給處理了,預(yù)留業(yè)務(wù)資源出來。

Confirm階段:

? ? 如果Try執(zhí)行成功無誤,那么就進入Confirm階段,接著之前,我們就該創(chuàng)建借閱信息了,只能使用Try階段預(yù)留的業(yè)務(wù)資源,如果創(chuàng)建成功,那么就對Try階段凍結(jié)的值進行解凍,整個流程就完成了,如果失敗了,就會進入Cancel階段。

Cancel階段:

? ? 將凍結(jié)的東西還給人家,進行回滾。

TCC特點:

跟XA協(xié)議相比,TCC就沒有協(xié)調(diào)者這一角色的參與了,而是自主通過上一階段的執(zhí)行情況來確保正常,充分利用了集群的優(yōu)勢,性能也是有很大的提升,但是缺點也很明顯,它與業(yè)務(wù)具有一定的關(guān)聯(lián)性,需要開發(fā)者去編寫更多的補償代碼,同時并不一定所有的業(yè)務(wù)流程都適用于這種形式。?

?Seata機制簡介

?seata支持四種事務(wù):

1.AT:標(biāo)志上就是2PC的升級版,在AT模式下,用戶只需要關(guān)心自己的"業(yè)務(wù)SQL"。

? ? ?一階段:seata會攔截"業(yè)務(wù)SQL",首先解析SQL語義,找到"業(yè)務(wù)SQL"要更新的業(yè)務(wù)數(shù)據(jù),在業(yè)務(wù)數(shù)據(jù)更新之前,將其保存成"before image",然后進行"業(yè)務(wù)SQL"更新業(yè)務(wù)數(shù)據(jù),在業(yè)務(wù)數(shù)據(jù)更新后,再將其保存到"after image",最后生成行鎖。以上操作只在一個數(shù)據(jù)庫事務(wù)內(nèi)完成,這樣保證了第一階段操作的原子性。

? ? 二階段如果確定提交的話,因為"業(yè)務(wù)SQL"在一階段已經(jīng)提交到數(shù)據(jù)庫,所以Seata框架只需要將第一階段保存的快照數(shù)據(jù)和行鎖刪掉,完成數(shù)據(jù)清除即可,當(dāng)然如果需要回滾,那么就用"before image"還原業(yè)務(wù)數(shù)據(jù),但在還原前首先要校驗臟讀,對比"數(shù)據(jù)庫當(dāng)前業(yè)務(wù)數(shù)據(jù)"和"after image",如果兩份數(shù)據(jù)完全一致就說明沒有臟讀,可以還原業(yè)務(wù)數(shù)據(jù),如果不一致就說明有臟讀,出現(xiàn)臟讀就需要轉(zhuǎn)人工處理。??

2.TCC:和我們上面講解的思路是一樣的。

3.XA:同上,但是要求數(shù)據(jù)庫本身支持這種模式才可以。

4.saga:用用處理長事務(wù),每個執(zhí)行者需要實現(xiàn)事務(wù)的正向操作和補償操作。

?那么,以AT模式為例,我們的程序是如何才能做到不對業(yè)務(wù)進行侵入的情況下實現(xiàn)分布式事務(wù)能?實際上,Seata客戶端,是通過對數(shù)據(jù)源進行代理實現(xiàn)的,使用的是DataSourceProxy類,所以在程序這邊,我們只需要將對應(yīng)的代理類注冊到Bean即可。(0.9版本之后支持自動代理,并不需要我們手動導(dǎo)入)

?使用file進行部署(以AT為例)

下載seata-server。

seats-server

?在idea中配置seata服務(wù)。

?seata支持本地部署也支持服務(wù)注冊與發(fā)現(xiàn)中心部署。(比如eureka,nacos)

seata存在著事務(wù)分組機制

1.事務(wù)分組:seata資源邏輯,可以按微服務(wù)的需要,在應(yīng)用程序(客戶端)對自定義事務(wù)進行分組,每個組取一個名字。

2.集群:seata-server服務(wù)端一個或多個節(jié)點組成的集群cluster。應(yīng)用程序(客戶端)使用時需要指定事務(wù)邏輯分組與seata服務(wù)器集群(默認為default)的映射關(guān)系。

為啥要設(shè)計成通過事務(wù)分組再直接映射到集群?為什么不直接將事務(wù)指定到集群呢?

獲取事務(wù)分組到映射集群的配置。這樣設(shè)計后,事務(wù)分組可以作為資源的邏輯隔離單位,出現(xiàn)某集群故障時可以快速failover(故障切換),只切換對應(yīng)的分組,可以把故障縮減到服務(wù)級別,但提前也是你有足夠server集群。

將各個服務(wù)作為seata的客戶端導(dǎo)入對于依賴:

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2021.0.1.0</version></dependency>

修改配置文件。

seata:service:vgroup-mapping:#這里需要對事務(wù)組進行映射,默認組名為應(yīng)用名稱-seata-service-group,將其映射到default集群#這個很關(guān)鍵,一定要配置,不然會找不到服務(wù)borrow-service-seata-service-group: defaultgrouplist: localhost:8868

也可以設(shè)置自定義的服務(wù)分組。

seata:service:vgroup-mapping:#這里需要對事務(wù)組進行映射,默認組名為應(yīng)用名稱-seata-service-group,將其映射到default集群#這個很關(guān)鍵,一定要配置,不然會找不到服務(wù)xx服務(wù)名xx-seata-service-group: xxxgrouplist: localhost:8868tx-service-group: xxx

現(xiàn)在我們接著來配置開啟分布式事務(wù),首先在啟動類上添加注解,此注解會添加一個后置處理器將數(shù)據(jù)源封裝為支持分布式事務(wù)的代理數(shù)據(jù)源(1.4.2版本還是要手動添加此注解才能生效)?

?接著我們需要在開啟分布式事務(wù)的方法上添加注解@GlobalTransactional。

?因為Seata會分析修改數(shù)據(jù)的sql,同時生成對應(yīng)的反向回滾sql,這個回滾記錄會存放在undo_log表中,所以要求每個Client都有一個對應(yīng)的undo_log表(也就是說服務(wù)連接的數(shù)據(jù)庫都需要創(chuàng)建一個這樣的表,因為我們的例子就一個數(shù)據(jù)庫,所有只要創(chuàng)建一個表)

創(chuàng)建undo_log表的sql語句。

CREATE TABLE IF NOT EXISTS `undo_log` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME NOT NULL COMMENT 'create datetime',`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',`ext` VARCHAR(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8;

?啟動服務(wù),進行測試。

第一次借閱成功。?

第二次借閱失敗。

?查看數(shù)據(jù)庫是否進行回滾。(這里進行了回滾)?

我們可以打印XID,查看其對應(yīng)的XID,在service層添加語句。

System.out.println(RootContext.getXID());

也可以在日志中查看。

使用nacos模式部署

我們先為Seata在nacos中配置一個命名空間。

修改seata的/conf/registry.conf文件,修改其注冊和配置中的“type”,“namespace”,“password”,“username”。

?注冊信息配置完成之后,接著我們需要將配置文件也放到Nacos中,讓Nacos管理配置,這樣我們就可以對配置進行熱更新了,一旦環(huán)境需要改變,只需要直接到Nacos中修改即可。

??

?我們需要對配置導(dǎo)入到Nacos中,我們打開seata源碼的 /script/config-center/nacos目錄,這是官方提供上傳腳本,我們直接運行即可。(去github下載seata源碼)

?在nacos中查看seata的配置。

?把所有微服務(wù)的事務(wù)分組信息的配置放在nacos中,我們還需要將對應(yīng)的事務(wù)組映射配置也添加上,DataId格式為service.vgroupMapping.'事務(wù)的名稱'。??

?接下來我們要對服務(wù)端的配置進行修改,我們刪除原本的seata配置,添加新的seata配置。

seata:#注冊registry:type: nacosnacos:#使用seata命名空間,這樣才能找到seata服務(wù),由于組名我們設(shè)置的是SEATA_GROUP就是默認的名字,所以就不用配了namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos#配置config:type: nacosnacos:namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos

? 啟動seata服務(wù),在nacos中對應(yīng)的命名空間觀察seata服務(wù)是否正常啟動。

啟動各個微服務(wù),各個服務(wù)使用nacos配置成功。

測試事務(wù)效果圖:

??

?我們還可以配置一下事務(wù)會話信息的存儲方式,默認是file類型的,那么就會在運行目錄下創(chuàng)建file_store目錄,我們可以將其放到數(shù)據(jù)庫中存儲,只需要修改一下數(shù)據(jù)即可。

?默認情況的存儲方式:

將會話信息存放到數(shù)據(jù)庫中?

修改nacos中的seata配置store.mode,store.session.mode,將存儲方式改為數(shù)據(jù)庫方式。

?

?將數(shù)據(jù)庫的配置信息進行修改。

1.數(shù)據(jù)庫啟動。

?2.數(shù)據(jù)庫的URL。

3.數(shù)據(jù)庫用戶名和密碼。

?

創(chuàng)建seata數(shù)據(jù)庫。

-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` (`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(128),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` (`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` (`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_status` (`status`),KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `distributed_lock` (`lock_key` CHAR(20) NOT NULL,`lock_value` VARCHAR(20) NOT NULL,`expire` BIGINT,primary key (`lock_key`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);

數(shù)據(jù)庫效果圖:

運行事務(wù)效果圖:?

分布式權(quán)限校驗??

因為是分布式服務(wù),每個微服務(wù)存儲的sessionb是各不相同的,而我們需要的是保證所有的微服務(wù)都能同步這些session信息,這樣我們才能實現(xiàn)某一個微服務(wù)登錄時,其他微服務(wù)都能知道。

實現(xiàn)上述要求的方案

方案一:我們可以在每臺服務(wù)器上都復(fù)制一份Session,但這樣顯然是很浪費時間的,并且用戶驗證數(shù)據(jù)占用的內(nèi)存會成倍的增加。

方案二:將Session移出服務(wù)器,用統(tǒng)一訪問Redis或是Mysql即可,這樣就能保證服務(wù)都可以同步Seesion了。

?明顯方案二是可行的。

每個微服務(wù)需要添加驗證機制,導(dǎo)入對應(yīng)依賴。

<!-- springSession Redis支持--><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId><version>2.7.0</version></dependency> <!-- 添加Redis的Starter--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.0</version></dependency>

導(dǎo)入springSecurity框架的依賴。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId><version>2.7.0</version></dependency>

對每個微服務(wù)的配置文件進行修改。

spring:session:store-type: redisredis:host: lcoalhost:6379

進行測試。

因為spring-security的安全機制,所以我們需要攜帶對應(yīng)的cookies才能訪問對應(yīng)的微服務(wù)。

其登錄頁面的用戶名為:user,密碼在idea的運行日志中。

?然后服務(wù)端會將session存放到數(shù)據(jù)庫中。

但是我們服務(wù)對應(yīng)的服務(wù)還是會報500錯誤,只是因為我們使用了RestTemplate,RestTemplate類似一個瀏覽器,由于該微服務(wù)調(diào)用了其他的微服務(wù),又因為RestTemplate在訪問微服務(wù)時沒有攜帶對應(yīng)的cookies,所以報出500錯誤。

OAuth2.0實現(xiàn)單點操作

?前面我們雖然使用了統(tǒng)一存儲來解決Session共享的問題,但是我們發(fā)現(xiàn)就算實現(xiàn)了Session共享,依舊存在一些問題,由于我們每個微服務(wù)都有自己的驗證模板,實際上整個系統(tǒng)上存在冗余功能的,同時還有我們上面出現(xiàn)的問題,那么能否實現(xiàn)只在一個服務(wù)進行等錄,就可以訪問其他的服務(wù)能?

?實際上之前的登錄模式稱為多點登錄,而我們希望的是實現(xiàn)單點登錄。

這里我們需要了解一種全新的登錄方式:OAuth2.0,我們經(jīng)常看到一些網(wǎng)站支持第三方登錄,就是使用OAuth2.0?來實現(xiàn)第三方授權(quán),基于第三方應(yīng)用訪問用戶信息的權(quán)限。(本質(zhì)上就是給別人調(diào)用自己服務(wù)接口的權(quán)限)

?四種授權(quán)模式

1.客戶端模式(Client Credentials)

這是最簡單的一種模式,我們可以直接向驗證服務(wù)器 請求可以Token,服務(wù)器拿到Token之后,才能去訪問服務(wù)資源,這樣資源服務(wù)器才能知道我們是誰以及是否成功登錄。(不需要密碼驗證)

雖然這種模式比較簡便,但是已經(jīng)失去了用戶驗證的意義,壓根就不是給用戶校驗準(zhǔn)備的,而是更適合內(nèi)部調(diào)用的場景。?

2.密碼模式(Resource Owner Password Credentials)

密碼模式相比客戶端模式,就多了用戶名和密碼的信息,用戶需要提供對應(yīng)的賬號的用戶名和密碼,才能獲取Token。

?雖然這樣看起來比較合理,但是會直接將賬號和密碼泄露給客戶端,需要后臺完全信任客戶端不會拿賬號和密碼去干其他壞事,所以也不是我們常見的。(可能前端或第三方會拿著你的賬號,登錄你的服務(wù)干壞事,很不安全)

3.隱式授權(quán)模式(Implicit Grant)

?首先用戶訪問頁面時,會重定向到認證服務(wù)器上,接著認證服務(wù)器給用戶一個認證頁面,等待用戶授權(quán),用戶填寫信息完成授權(quán)后,認證服務(wù)器返回Token。?

?它適用于沒有服務(wù)端的第三方應(yīng)用頁面,并且相比前面一種形式,驗證都是在驗證服務(wù)器進行的,敏感信息不會輕易泄露,但是Token依然存在泄漏的風(fēng)險。

?4.授權(quán)碼模式(Authrization Code)

這種模式是最安全的一種模式,也是推薦使用的一種,比如我們手機上的很多APP都是使用的這種方式。

相比隱式授權(quán)模式,它并不會直接返回Token,而是返回授權(quán)碼,真正的Token是通過應(yīng)用服務(wù)器訪問驗證服務(wù)器獲得的。在一開始的時候,應(yīng)用服務(wù)器(客戶端通過訪問自己的應(yīng)用服務(wù)器來進行訪問其他的服務(wù))和驗證服務(wù)器之間會共享一個“secret”(沒有登錄的時候是沒有的“secret”),這個東西沒有其他人知道,而驗證服務(wù)器在用戶驗證之后,會返回一個授權(quán)碼,應(yīng)用服務(wù)器最后將授權(quán)碼和“secret”一起交給驗證服務(wù)器進行驗證,并且Token也是在服務(wù)器之間傳遞,不會直接給客戶端。?

?這樣就算有人中途竊取了授權(quán)碼,也毫無意義,因為Token的獲取必須同時攜帶授權(quán)碼和“secret”,但是“secret”第三方是無法得知的,并且Token不會直接給客戶端,大大減少了泄漏的風(fēng)險。

OAth2.0不應(yīng)該是那種第三方應(yīng)用為了請求我們的服務(wù)而使用的嗎,而我們這里需要的只是實現(xiàn)同一個應(yīng)用內(nèi)部服務(wù)之間的認證,其實我們也可以利用OAuth2.0來實現(xiàn)單點登錄,只是少了資源服務(wù)器這個角色,客戶端就是我們的整個系統(tǒng),接下來就讓我們來實現(xiàn)一下。

?搭建驗證服務(wù)器

第一步就是最重要的,我們需要搭建一個驗證服務(wù)器,它是我們進行權(quán)限校驗的核心,驗證服務(wù)器有很多的第三方實現(xiàn),也有Spring官方通過的實現(xiàn),這里我們使用Spring官方通過的驗證服務(wù)器。?

?導(dǎo)入對應(yīng)依賴:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><!-- oauth依賴不再內(nèi)置到spring-cloudy依賴中,需要指定對應(yīng)的版本,且其已經(jīng)整合到spring-security中了--><dependency><groupId>org.springframework.security.oauth</groupId><artifactId>spring-security-oauth2</artifactId><version>2.5.2.RELEASE</version></dependency>

修改配置文件。

server:port: 8500servlet:#為了防止一會在服務(wù)間跳轉(zhuǎn)導(dǎo)致Cookies打架。(因為所有的服務(wù)地址但是localhost,都會存JSESSIONID)#這里修改一下context-path,這樣保存的Cookie會使用指定的路徑,就不會和其他服務(wù)打架了#但注意之后的所有請求都得在最前面添加這個路徑context-path: /sso

?編寫spingSecurity配置類和OAuth2的配置類。

springSecurity配置類:

@Configuration public class SecurityConfiguration extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(AuthenticationManagerBuilder auth) throws Exception {BCryptPasswordEncoder bCryptPasswordEncoder = new BCryptPasswordEncoder();//對密碼進行加密的類auth.inMemoryAuthentication()//直接創(chuàng)建一個用戶.passwordEncoder(bCryptPasswordEncoder).withUser("test").password(bCryptPasswordEncoder.encode("123456")).roles();}@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().anyRequest().authenticated().and().formLogin().permitAll();//使用表單登錄}@Bean@Override //這里我們需要將AuthenticationManager注冊為Bean,因為我們要在OAuth2中使用它public AuthenticationManager authenticationManagerBean() throws Exception {return super.authenticationManagerBean();} }

OAth2配置類:

@EnableAuthorizationServer //開啟驗證服務(wù)器 @Configuration public class OAuth2Configuration extends AuthorizationServerConfigurerAdapter {@Resourceprivate AuthenticationManager manager;private final BCryptPasswordEncoder encoder = new BCryptPasswordEncoder();//對密碼進行加密的類/*** 這個方法是對客戶端進行配置,一個驗證服務(wù)器可以預(yù)設(shè)很多個客戶端,* 之后這些指定的客戶端就可以按照下面指定的方式進行驗證* @param clients 客戶端配置工具*/@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創(chuàng)建,當(dāng)然也可以像Security那樣自定義或是使用JDBC從數(shù)據(jù)庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動審批,這里關(guān)閉,要的就是一會體驗?zāi)欠N感覺.scopes("book", "user", "borrow") //授權(quán)范圍,這里我們使用全部all.authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權(quán)模式,一共支持5種,除了之前我們介紹的四種之外,還有一個刷新Token的模式//這里我們直接把五種都寫上,方便一會實驗,當(dāng)然各位也可以單獨只寫一種一個一個進行測試//現(xiàn)在我們指定的客戶端就支持這五種類型的授權(quán)方式了}@Overridepublic void configure(AuthorizationServerSecurityConfigurer security) {security.passwordEncoder(encoder) //編碼器設(shè)定為BCryptPasswordEncoder.allowFormAuthenticationForClients() //允許客戶端使用表單驗證,一會我們POST請求中會攜帶表單信息.checkTokenAccess("permitAll()"); //允許所有的Token查詢請求}@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動,這里需要配置一下authenticationManager,才能正常使用password模式} }

? ? ? ? 然后我們使用測試工具進行測試。?

1.首先我們從最簡單的客戶端模式進行測試,客戶端模式只需要提供id和secret即可直接拿到Token,注意需要添加一個grant_type來表明我們的授權(quán)方式,默認請求路徑為:http://localhost:8500/sso/oauth/token。

測試結(jié)果圖:

?我們可以通訪問http://localhost:8500/sso/oauth/check_token來驗證我們的Token是否有效。?

?

2.我們進行密碼模式的測試,這里的請求參數(shù)為username和password,授權(quán)模式改為passwod。

?在請求頭中添加Basic驗證信息。

測試結(jié)果圖:

?Token驗證(返回用戶名):

?3.隱式授權(quán)模式,這種模式我們需要在驗證服務(wù)器上進行驗證,而不是直接請求Token,驗證登錄請求地址:http://localhost:8500/sso/oauth/authorize?client_id=web&response_type=token。

注意response_type一定要是Token類型,這樣才會返回Token,瀏覽器發(fā)起請求后,可以看到SpringSecurity的登錄頁面。

登錄之后會有個錯誤信息,這是因為登錄成功后,驗證服務(wù)器需要將結(jié)果給回客戶端,所以需要提供供客戶端的回調(diào)地址,這樣瀏覽器就會被重定向到指定的回調(diào)地址并且請求中回攜帶Token信息,這里我們隨便配置一個回調(diào)信息。

@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創(chuàng)建,當(dāng)然也可以像Security那樣自定義或是使用JDBC從數(shù)據(jù)庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動審批,這里關(guān)閉,要的就是一會體驗?zāi)欠N感覺.scopes("book", "user", "borrow") //授權(quán)范圍,這里我們使用全部all.redirectUris("localhost:8202/login").authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權(quán)模式,一共支持5種,除了之前我們介紹的四種之外,還有一個刷新Token的模式//這里我們直接把五種都寫上,方便一會實驗,當(dāng)然各位也可以單獨只寫一種一個一個進行測試//現(xiàn)在我們指定的客戶端就支持這五種類型的授權(quán)方式了}

進行授權(quán)。

最終會將Toekn返回到指定的頁面。

4.最安全的授權(quán)碼模式,這種模式其實流程和隱式授權(quán)模式是一樣的,當(dāng)是請求的是Code類型:http:localhost:8500/sso/oauth/authorize?/client_id=web&response_type=code。

在訪問此地址依舊會進入回調(diào)地址,但是這時給的就是授權(quán)碼了,而不是直接返回Token。

然后我們可以使用這個授權(quán)碼和secret來獲取Token。

5.最后還有一個是刷新Token用的模式,當(dāng)我們的Token過期時,我們就可以使用refresh_token來申請一個新的Token,當(dāng)我們使用授權(quán)碼模式時,在成功驗證以后驗證服務(wù)器會返回一個refresh_token,如果我們需要刷新一下Token,就執(zhí)行下面操作。

?在SecurityConfiguration配置類中將UserDetailsService注入spring容器中。

@Bean@Overridepublic UserDetailsService userDetailsServiceBean() throws Exception {return super.userDetailsServiceBean();}

在OAuth2Configuration類中使用UserDetailsService。

@ResourceUserDetailsService service;@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.userDetailsService(service).authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動,這里需要配置一下authenticationManager,才能正常使用password模式}

進行測試。

redis與分布式(docker模擬集群搭建)

拉取redis文件。

docker pull redis

在云服務(wù)器上創(chuàng)建等會需要掛載的目錄和文件

  • 創(chuàng)建data目錄(存放數(shù)據(jù)文件,包括用于持久化的dump.rdb)
mkdir -p /mydata/redis/data
  • 創(chuàng)建配置文件
mkdir -p /mydata/redis/conf touch /mydata/redis/conf/redis.conf

?創(chuàng)建redis容器,運行redis,并進行數(shù)據(jù)和配置文件的掛載。

docker run -p 3346:6379 --name redis -v /mydata/redis/data:/data \ -v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \ -d redis redis-server /etc/redis/redis.conf

:左邊的表示云服務(wù)器的真實端口,右邊表示redis容器中的端口。

--name 表示為容器設(shè)置的名稱,-d 后面的名稱表示鏡像的名稱。

進入對應(yīng)的容器

docker exec -it 容器對應(yīng)id或是容器名稱 bash #進入到docker中 redis-cli #加入容器中的redis中#也可以直接進入 docker exec -it 容器對應(yīng)的id或是容器的名稱 redis-cli

效果圖:

docker 搭建redis集群(使用三個redis搭建集群)

?在對應(yīng)的目錄下創(chuàng)建用來掛在數(shù)據(jù)的目錄。

mkdir -p /mydata/redis/cluster/node1/data #集群一 mkdir -p /mydata/redis/cluster/node2/data #集群二 mkdir -p /mydata/redis/cluster/node3/data #集群三

為了方便我們這里就不去手動修改redis.conf,我們這里使用命令我們設(shè)置操作。

1.為先創(chuàng)建對應(yīng)的redis容器。

docker create --name redis-node1 -v /mydata/redis/cluster/node1/data:/data \ -p 3346:6379 redis --cluster-enabled yes \ --cluster-config-file redis-node1.conf

首先間將存儲數(shù)據(jù)目錄進行掛載,配置端口,設(shè)置集群模式為true,設(shè)置集群的配置文件,做集群的時候會將一些配置添加到該配置文件中。(該文件會自動生成)

2.啟動該容器。

docker start redis-node1

另一個redis的配置跟redis-node1是一樣的,只要將容器名稱和掛載路徑進行修改即可。

?現(xiàn)在我們只是單純的搭建了兩個單獨的redis,我們還需要將兩者聯(lián)系起來。

因為我們設(shè)置docker中部署,所以docker 會給每一個容器分配一個ip地址,我們需要查看對應(yīng)的ip才能進行聯(lián)系。

查看方式。

docker inspect 對應(yīng)的容器名

ip為IpAddress的參數(shù)。?

?

執(zhí)行命令創(chuàng)建集群。

redis-cli --cluster create 172.17.0.3:6379 172.17.0.4:6379 172.17.0.4:6379 --cluster-replicas 0

這里cluster-replicas表示主從比例,我們設(shè)置為0,就說明全部為master,如果我們需要做主從關(guān)系的話,也就是不將cluster-replicas的比例設(shè)置為0,那我們需要保證有三個以上的master(主節(jié)點),不然是無法搭建集群的。(并且搭建集群的最少節(jié)點數(shù)為3個,就是需要保證有三個master)

錯誤信息:

按其默認的配置,我們輸入"yes"?。

測試集群是否生效

?加入對應(yīng)的容器。

docker exec -it redis-node1 Redis-cli -c #-c必須要加,表示以集群啟動,這樣在做數(shù)據(jù)庫操作時不會因為插槽的限制而不完成不了操作

因為redis-node1表示負責(zé)管理5798插槽的,所以集群就中找到負責(zé)管理該插槽的節(jié)點進行set操作。?

?接下來我們?nèi)テ渌?jié)點查看是否將數(shù)據(jù)插入成功。

我們隨便在一個節(jié)點中查詢對應(yīng)的值,即使該節(jié)點沒有需要的值,該集群會自動在節(jié)點中查找需要的值。

?查詢集群節(jié)點信息。

cluster nodes

?如果需要刪除集群的話,我們只需要刪除集群中使用節(jié)點的集群配置信息redis-node*.conf。

如果需要在集群中添加從節(jié)點,我們可以執(zhí)行對應(yīng)命令:

redis-cli --cluster add-node 172.17.0.8:6379 --cluster-slave --cluster-master 對應(yīng)的主節(jié)點的id

如果需要提高java代碼獲取信息的話,我們就需要將各個節(jié)點的配置進行掛載,然后修改配置信息,將保護模式關(guān)閉,并將綁定Ip注釋掉。

?集群模式是自帶哨兵模式的。(當(dāng)主節(jié)點down掉時,會將其從節(jié)點作為新的主節(jié)點)

哨兵模式的選舉規(guī)則

1.首先會根據(jù)優(yōu)先級進行選擇,可以在配置文件中進行配置,添加"relica-priority"配置項(默認是100),越小表示的優(yōu)先級就越高。

2.如果優(yōu)先級一樣,那就選擇偏移量最大的。

3.要是還是選不出來,就選擇runid(啟動時隨機生成的)最小的。

實現(xiàn)分布式鎖

?為了解決電商超買的問題,我們可以通過redis設(shè)置分布式鎖。

#只有當(dāng)key值不存在的時候才能進行設(shè)置,其實際上是set if no exists的縮寫 setnx key value

利用這種特性,我們就可以在不同的服務(wù)中實現(xiàn)分布式鎖,如果某個服務(wù)器加了鎖但是卡頓了,或是直接崩潰了,那么這把鎖豈不是就永遠無法釋放了,因此我們可以考慮添加一個過期時間。

set key value EX num NX

?這里使用set命令,最后加一個NX表示使用setx的模式,和上面一樣,但是可以通過EX設(shè)置過期時間,這里設(shè)置為num秒,也就是說如果num秒還沒釋放,那就自動刪除。

上鎖出現(xiàn)的問題

1.?

?2.?

?3.

?要解決這個問題,我們可以借助一下Redisson框架,它是Redis官方推薦的java版Redis客戶端,它提供的功能非常強大,也非常大,Redisson內(nèi)部提供一個監(jiān)控鎖的看門狗。它的作用是在Redisson實例被關(guān)閉前,不斷的延長鎖的有效期,它為我們提供了很多中分布式的實現(xiàn),這里我們嘗試使用它的分布式鎖的功能。

導(dǎo)入依賴。

<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.2.1</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.0</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency>

?我們先測試沒有加鎖的情況。

測試代碼:

import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){for (int j = 0; j < 100; j++) { //每個客戶端獲取a然后增加a的值再寫回去,如果不加鎖那么肯定會出問題,在每次插入的過程中,其實內(nèi)部值已經(jīng)發(fā)生改變了,比如說同一時間,其獲取到a的值是相同的,也就是說這么多個插入最終只是+1。int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");}}}).start();}} }

結(jié)果圖:?

沒有到1000,說明出現(xiàn)錯誤了,?加上鎖試試。

測試代碼:

import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379"); //配置連接的Redis服務(wù)器,也可以指定集群RedissonClient client = Redisson.create(config); //創(chuàng)建RedissonClient客戶端for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){RLock lock = client.getLock("testLock"); //指定鎖的名稱,拿到鎖對象for (int j = 0; j < 100; j++) {lock.lock(); //加鎖int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");lock.unlock(); //解鎖}}System.out.println("結(jié)束!");}).start();}} }

效果圖:

符合最終的結(jié)果。?

Mysql與分布式??

?主從復(fù)制

當(dāng)我們使用Mysql的時候,也可以采用主從復(fù)制的策略,它的實現(xiàn)基本和Redis相似,也可以采用增量復(fù)制的方式,Mysql會在運行的過程中,會記錄二進制日志,所有的DML和DDL操作都會被記錄進日志中,主數(shù)據(jù)庫只需要將記錄的操作復(fù)制給從庫,讓從庫也運行一次,那么就可以實現(xiàn)主從復(fù)制,但是注意它不會在一開始進行全量復(fù)制,所以最好在開始主從復(fù)制之前將數(shù)據(jù)庫的內(nèi)容保持一致。

和Redis一樣,一但我們實現(xiàn)了主從復(fù)制,那么就算主庫出現(xiàn)故障,從庫也能正常提供服務(wù),并且可以實現(xiàn)讀寫分離等操作,這里我們使用一主一從方式來搭建。?

通過docker 拉取鏡像,通過設(shè)置兩個不同的端口,啟動兩個mysql。

docker run -p 3346:3306 --name main_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql #-p 設(shè)置端口進行端口映射,-e編輯mysql root用戶的密碼,-d表示后太啟動 docker run -p 3347:3306 --name slave_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql

?5.進入容器。

docker exec -it 容器Id或name /bin/bash #再切目錄: cd /etc/mysql。 如果vim指令不存在,說明沒有安裝,我們需要進行安裝。 apt-get update apt-get install vim

修改主從mysql的配置 my.cnf。

主表中進行配置:

#如果是在同一個服務(wù)器上那就要保證server-id不同,不然會發(fā)生沖突 server-id=100 #開啟二進制日志功能,因為是mater所以是必要的(名字自己定) log-bin=mysql-bin

?從表中進行配置:

#設(shè)置server_id,注意要唯一 server-id=101 #開啟二進制日志功能,以備Slave作為其它Slave的Master時使用(子節(jié)點可以作為別人的主節(jié)點) log-bin=mysql-slave-bin #relay_log配置中繼日志 relay_log=edu-mysql-relay-bin

??

重啟docker 容器使其配置生效。

docker restart main_mysql

進入mysql。

mysql mysql -uroot -p123456

創(chuàng)建用戶并授權(quán)允許從庫服務(wù)連接主庫的服務(wù)。

#創(chuàng)建一個用戶 賬號為:slave,密碼為:123456 CREATE USER test IDENTIFIED WITH mysql_native_password BY '123456';#給這個slave用戶授權(quán),授權(quán)主從復(fù)制權(quán)限和主從復(fù)制的連接GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO test;

如果報以下錯誤,說明已經(jīng)存在主從關(guān)系了,我們需要先關(guān)閉主從關(guān)系。

stop slave; reset master;

刷新一下mysql權(quán)限。

flush privileges

查詢主庫File和Position。

show master status

?查看docker 給對應(yīng)的容器分配的ip。

docker inspect 對應(yīng)的容器名

?進入從數(shù)據(jù)庫,進行對指定對應(yīng)的主庫。

#指定ip,端口,和用戶,將對應(yīng)的mysql數(shù)據(jù)庫設(shè)置為該mysql數(shù)據(jù)庫的主節(jié)點,我們還需要指定主節(jié)點的二進制日志文件和主節(jié)點的偏移量和主節(jié)點的重連次數(shù) change master to master_host='172.17.0.4', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000004', master_log_pos= 861, master_connect_retry=30;#例子二: change master to master_host='172.17.0.2', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000003', master_log_pos= 157, master_connect_retry=30;

開啟主從復(fù)制功能。

start replica

使用對應(yīng)命令?查看從庫的狀態(tài)。

show slave status \G

效果圖:

分庫分表?

?在大型的互聯(lián)網(wǎng)系統(tǒng)中,可能單臺的Mysql已經(jīng)無法滿足業(yè)務(wù)的需求,這時候就需要進行擴容了。

單臺主機的硬件資源是存在瓶頸的,不可能無限制地擴展,這時候我們就得通過多臺實例來進行容量的橫向擴容,我們可以將數(shù)據(jù)分散儲存,讓多臺主機共同來保存數(shù)據(jù)。

擴容方法分為有兩種。

1.垂直拆分:我們的表和數(shù)據(jù)庫都可以進行垂直拆分的,就是將數(shù)據(jù)庫中所有的表按照業(yè)務(wù)功能進行拆分到各個數(shù)據(jù)庫中(有點類似微服務(wù)),而對于一張表也可以通過外鍵之類的機制將其拆分為多個表。

????????

?2.水平拆分:水平拆分針對的不是表,而是數(shù)據(jù),我們可以讓很多個具有相同表的數(shù)據(jù)庫存放一部分?jǐn)?shù)據(jù),相當(dāng)于是將數(shù)據(jù)分散存儲在各個節(jié)點上。

那么要實現(xiàn)這樣的拆分操作,我們自行去編寫代碼的工作量是比較大的,因此目前實際上已經(jīng)有一些解決方案了,比如我們可以使用MyCat(也就是數(shù)據(jù)庫中間插件,相當(dāng)于掛了一層代理,再通過MyCat進行分庫分表操作數(shù)據(jù)庫,只需要連接就可以使用,類似的還有ShardingSphere-Proxy)或是Sharding JDBC(應(yīng)用程序中直接對SQL語句進行分析,然后轉(zhuǎn)換成分庫分表操作,需要我們自己編寫一些邏輯代碼)?

?Sharding JDBC?

shardingJDBC官網(wǎng)

?定位為輕量級Java框架,在java的JDBC層提供的額外服務(wù),它使用客戶端直連數(shù)據(jù)庫,以Jar包形式提供服務(wù),無需額外部署和依賴,可以理解為增強版的JDBC驅(qū)動,完全兼容JDBC和各種ORM框架。

1.適用于如何基于JDBC的ORM框架,如:JPA,Mybatis,Spring JDBC Template或直接使用JDBC。

2.支持任何第三方的數(shù)據(jù)庫連接池,如 DBCP,C3P0,BoneCP,HikariCP。

3.支持任意實現(xiàn)JDBC規(guī)范的數(shù)據(jù)庫,目前支持Mysql,Oracle,SQLServer以及任何可以使用JDBC的數(shù)據(jù)庫。

水平拆分實現(xiàn)

導(dǎo)入對應(yīng)依賴。

<!-- shardingJDBC的依賴--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId><version>5.1.1</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>

?在配置文件中配置數(shù)據(jù)源。

spring:shardingsphere:datasource:#幾個數(shù)據(jù)就配幾個,這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個數(shù)據(jù)源單獨配置db0:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456

到目前配置為止,實際上這些配置都是常規(guī)的操作,在編寫代碼時關(guān)注點依然放在業(yè)務(wù)上,現(xiàn)在我們就來編寫配置文件,我們需要告訴ShardingJDBC要如何進行分片,首先明確:現(xiàn)在就是兩個數(shù)據(jù)庫都有Test表存放用戶數(shù)據(jù),我們的目標(biāo)是將用戶信息分別存放到這兩個數(shù)據(jù)庫表中。

?繼續(xù)修改配置文件,設(shè)置其切片模式。

spring:shardingsphere:rules:sharding:tables:#這里填寫表的名稱,程序中對這張表的所有操作都會采用下面的路由方案#比如我們上面Mybatis就是對test表進行操作,所以會走下面的路由方案test:#這里填寫實際的路由節(jié)點,比如我們要分兩個庫,那么就可以把兩個數(shù)據(jù)庫都寫上去,以及對應(yīng)的表#也可以使用表達式,比如下面的可以簡寫為db$->{0,1}.testactual-data-nodes: db0.test,db1.test#這里是分庫的策略配置database-strategy:#這里選擇標(biāo)準(zhǔn)策略,也可以配置復(fù)雜策略,基于多個鍵進行分片standard:#參與分片運算的字段,下面的算法會根據(jù)這里提供的字段進行運算sharding-column: id#這里填寫我們下面自定義的算法名稱sharding-algorithm-name: my-algsharding-algorithms:#自定義用戶新的算法,名稱自定義my-alg:#算法類型,官方內(nèi)置了很多種、type: MODprops:sharding-count: 2props:sql-show: true

進行測試。

我們在mapper中編簡單的插入語句,對應(yīng)其進行測試。

在測試類中編寫代碼。

@SpringBootTest class ShardingtestApplicationTests { @AutowiredUserMapper userMapper;@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"xxx", "cccc"));}}}

?測試結(jié)果圖:

以為我們自定義的算法是通過取模的結(jié)果來存放到不同的數(shù)據(jù)庫中,在圖中我們可以發(fā)現(xiàn)取模為0時將數(shù)據(jù)存放到db0中,而取模為1時就將數(shù)據(jù)存放到db1中。

可以看到所有的SQL語句都有一個Logic SQL(這個就是我們在Mybatis里面寫的,是什么就是什么)緊接著下面就是Actual SQL,也就是說每個邏輯SQL最終會根據(jù)我們的策略轉(zhuǎn)換為實際SQL,它的id是0,那么實際轉(zhuǎn)換出來的SQL會在db0這個數(shù)據(jù)源進行插入。

我們查看數(shù)據(jù)庫中的信息。

分表查詢和查詢操作

現(xiàn)在我們在我們的數(shù)據(jù)庫中有test_0,test_1兩張表,表結(jié)構(gòu)一樣,但是我們也可以希望能夠根據(jù)id取模運算的結(jié)果分別放到這兩個不同的表中,其實和分庫差不多。

1.邏輯表:相同結(jié)構(gòu)的水平拆分?jǐn)?shù)據(jù)庫(表)的邏輯名稱,是SQL中表的邏輯標(biāo)識。如:訂單數(shù)據(jù)根據(jù)主鍵尾數(shù)拆分為10張表,分別是t_order_0到t_order_9?,它們的邏輯表名為t_order。

2.真實表:在水平分的數(shù)據(jù)庫中真實存在的物理表,即上個例子中的t_order_0到t_order_9。

我們創(chuàng)建兩個跟test表結(jié)構(gòu)相同的test_0,test_1。

修改配置文件改為分表操作。

spring:shardingsphere:rules:sharding:tables:test:actual-data-nodes: db0.test_$->{0..1}#現(xiàn)在我們來配置一下分表策略,注意這里是table-strategy上面是database-strategytable-strategy:#基本都跟之前是一樣的standard:sharding-column: idsharding-algorithm-name: my-algsharding-algorithms:my-alg:#這里我們演示一下INLINE方式,我們可以自行編寫表達式來決定,自己編寫表達式type: INLINEprops:#比如我們還是希望進行模2計算得到數(shù)據(jù)該去的表#只需要給一個最終的表名稱就行了test_,后面的數(shù)字是表達式取模算出的#實際上這樣寫和MOD模式一模一樣algorithm-expression: test_$->{id % 2}#沒錯,查詢也會根據(jù)分片策略來進行,但是如果我們使用的是范圍查詢,那么依然會進行全量查詢#這個我們后面緊接著會講,這里先寫上吧allow-range-query-with-inline-sharding: falseprops:sql-show: true

插入數(shù)據(jù)效果圖:?

?在id去摸為0時就插入test_0,取模為1時就插入test_1。

接下來我們進行范圍查詢操作,在Mapper層中編寫范圍查詢。

@Mapper public interface UserMapper {@Select("select * from test where id between #{startId} and #{endId}")List<User> getUserById(int startId, int endId); }

在test層做測試。?

@Testvoid contextLoads() {System.out.println(userMapper.getUserById(3,5));}

運行測試,發(fā)生報錯。

???????

這是因為默認情況下?allow-range-query-with-inline-sharding配置就是為false,就是不支持范圍查詢,所以我們要將其設(shè)置為true。

重新運行的效果圖:

分布式序列算法?

在復(fù)雜分布式系統(tǒng)中,特別是微服務(wù)架構(gòu)中,往往需要大量的數(shù)據(jù)和信息進行唯一的標(biāo)識,隨著系統(tǒng)的復(fù)雜,數(shù)據(jù)的增多,分庫分表成為了常見的方案,對數(shù)據(jù)分庫分表后需要有一個唯一的Id來標(biāo)識一條數(shù)據(jù)后消息(如訂單號,交易流水,事件編號等),此時一個能夠生成全局唯一Id的系統(tǒng)是非常重要的。

我們之前創(chuàng)建過學(xué)生信息表,圖書借閱表,圖書管理表,所有的信息都會有一個Id作為主鍵,并且這個Id有以下要求:

1.為了區(qū)別其他的數(shù)據(jù),這個Id必須是全局唯一的。

2.主鍵應(yīng)該盡可能的保持有序,這樣會大大提高索引的查詢效率。

?在我們的分布式系統(tǒng)下有兩種方案來解決此問題

1.使用UUID:UUID是由一組32位數(shù)的16進制數(shù)字隨機生成的,我們可以直接使用JDK為我們提供的UUID類來實現(xiàn)。

@Testvoid test01() {System.out.println(UUID.randomUUID().toString());}

效果圖:

UUID的生成速度非常快,可以看到確實是能夠保證唯一性,因為每都不一樣,而且這樣長一串那重復(fù)的幾率會非常小。但是它并不滿足我們上面的第二個要求,也就是說我們需要盡可能的保證有序,而這里我們得到的都是一些無序的Id。

2.雪花算法(Snowflake)

?它會生成一個64bit大小的整型的Id,int肯定是裝不下的。

可以看到它主要是三個部分組成的,時間+工作機器Id+序列號,時間以毫秒為單位,41個bit位能表示約為70年的時間,時間紀(jì)元從2016年11月1日零點開始,可以使用到2086年,工作機器ID其實就是節(jié)點Id,每個節(jié)點的Id都不同,那么就可以區(qū)分出來,10個bit位可以表示最多1024個節(jié)點,最后12位就是每個節(jié)點的序列號,因此每臺機器每毫秒?就可以有4096個序列號。

它兼具了上面所說的唯一性和有序性了,但是依然是有缺點的,第一是時間問題,如果機器時間出現(xiàn)倒退,那么就會導(dǎo)致生成重復(fù)的Id,并且節(jié)點容量只有1024個,如果是超大規(guī)模集群,也是存在隱患的。

?修改數(shù)據(jù)庫的id類型,因為是要裝下64的數(shù)據(jù),所以我們要為其配置bigint類型。

設(shè)置mybatis的插入操作。

@Update("insert into test(name, password) values(#{name},#{password} )")int addUser2(User user); }

?修改配置文件。

spring:shardingsphere:datasource:#幾個數(shù)據(jù)就配幾個,這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個數(shù)據(jù)源單獨配置db0:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:sharding:tables:test:actual-data-nodes: db0.test,db1.test#這里還是使用分庫策略database-strategy:standard:sharding-column: idsharding-algorithm-name: my-alg#這里使用自定義的主鍵生成策略key-generate-strategy:column: idkey-generator-name: my-genkey-generators:#這里寫我們自定義的主鍵生成算法my-gen:#使用雪花算法type: SNOWFLAKEprops:#工作機器ID,保證唯一就行worker-id: 666sharding-algorithms:my-alg:type: MODprops:sharding-count: 2props:sql-show: true

測試類代碼。

@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser2(new User("aaa", "cccc"));}}

效果圖:

數(shù)據(jù)庫信息:
?

?如果我們要使用UUID的話,只要在配置文件中將自定義生成主鍵算法的type改為UUID即可。

讀寫分離

?在從表中的配置文件中設(shè)置開啟只讀模式 read-only=1。(如果你是root的話還是可以入數(shù)據(jù)的,而普通用戶就只能讀取了)

?配置好主從關(guān)系。(前講過了)

然后修改配置文件。

spring:shardingsphere:datasource:#幾個數(shù)據(jù)就配幾個,這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個數(shù)據(jù)源單獨配置db0:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實現(xiàn)類,這里默認使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:#配置讀寫分離readwrite-splitting:data-sources:#名稱隨便寫user-db:#使用靜態(tài)類型,動態(tài)Dynamic類型可以自動發(fā)現(xiàn)auto-aware-data-source-nametype: Staticprops:#配置寫庫(只能一個)write-data-source-name: db0#配置從庫(多個,逗號隔開)read-data-source-names: db1#負載均衡策略,可以自定義load-balancer-name: my-loadload-balancers:#自定義的負載均衡策略my-load:type: ROUND_ROBINprops:sql-show: true

測試代碼:

@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"aaa", "cccc"));}System.out.println(userMapper.getUserById(3, 5));}

?測試效果圖:

?插入語句全部在主節(jié)點數(shù)據(jù)庫中執(zhí)行,而查詢操作都在從節(jié)點數(shù)據(jù)庫中操作。

RabbitMQ(消息隊列)

我們之前如果需要進行遠程調(diào)用,那么一般可以通過發(fā)送HTTP請求完成,而現(xiàn)在,我們可以使用第二種方式,就是消息隊列,它能夠?qū)l(fā)送的消息放到隊列中,當(dāng)新的消息入列時,會通知接收方進行處理,一般消息發(fā)送稱為生產(chǎn)者,接收方稱為消費者。

這樣我們所有的請求都可以直接丟到消息隊列中,再由消費者取出,不再是直接連接消費者的形式了,而是加了一個中間商,這也是一種很好的解決方案,并且在高并發(fā)的情況下,消息隊列也能起到綜合的作用,堆積一部分請求,再由消費者來慢慢處理,而不會像直接調(diào)用那樣請求蜂擁而至。

消息隊列的具體實現(xiàn):

在云服務(wù)器上安裝和部署(在docker進行)

在docker 中拉去Ribbitmq鏡像。

在docker 中運行ribbitmq。

docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq

?查看rabbitmq的狀態(tài)。

rabbitmqctl status

接著我們還可以將Rabbitmq的管理面板開啟,這樣就可以在瀏覽器上進行實時訪問和監(jiān)控了。?

rabbitmq-plugins enable rabbitmq_management

?開啟面板。

?賬號和密碼都為:guest。

給Rabbitmq設(shè)置新的用戶。

rabbitmqctl add_user 用戶名 密碼

給予新的用戶管理員權(quán)限。

rabbitmqctl set_user_tags 用戶名 administrator

消息隊列的基本原理:

????????

生產(chǎn)者(Publisher)和消費者(Consumer):不用多說了吧。

Channel:我們的客戶端連接都會使用一個Channel,再通過Channel去訪問到RabbitMQ服務(wù)器,注意通信協(xié)議不是http,而是amqp協(xié)議。

Exchange:類似于交換機一樣的存在,會根據(jù)我們的請求,轉(zhuǎn)發(fā)給相應(yīng)的消息隊列,每個隊列都可以綁定到Exchange上,這樣Exchange就可以將數(shù)據(jù)轉(zhuǎn)發(fā)給隊列了,可以存在很多個,不同的Exchange類型可以用于實現(xiàn)不同消息的模式。

Queue:消息隊列本體,生產(chǎn)者所有的消息都存放在消息隊列中,等待消費者取出。

Virtual Host:有點類似于環(huán)境隔離,不同環(huán)境都可以單獨配置一個Virtual Host,每個Virtual Host可以包含很多個Exchange和Queue,每個Virtual Host相互之間不影響。

?如果出現(xiàn)以下錯誤,需要在rabbitmq的配置文件進行更改

?修改方式為下:

因為是使用docker 容器安裝的,所有需要進入容器 docker exec -it rabbitmq /bin/bash進入目錄 cd /etc/rabbitmq/conf.d/執(zhí)行命令 echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf退出容器 exit重啟rabbitmq docker restart rabbitmq

rabbitmq的使用

?1.最簡單模式:

?(一個生產(chǎn)者->消息隊列->一個消費者)

生產(chǎn)者只需要將數(shù)據(jù)丟入消息隊列,二消費者只需要將數(shù)據(jù)從消息隊列中取出,這樣就實現(xiàn)了生產(chǎn)者和消費者的消息交互。

創(chuàng)建測試環(huán)境。

當(dāng)前的用戶就添加了剛剛我們新建的測試環(huán)境。

現(xiàn)在我們來看看交換機。

?交換機列表中自動為我們新增了剛剛創(chuàng)建好的預(yù)設(shè)交換機,一共7個。
?

?第一個交換機是所有虛擬主機都會自帶的一個默認交換機,并且此交換機不可能刪除,此交換機默認綁定所有的消息隊列,如果是通過默認交換機發(fā)送消息,那么就會根據(jù)消息的"rountKey"(類似IP地址)決定發(fā)送給哪個同名的消息隊列(是消息隊列的名稱不是它的routingKey),同時也不能顯示地將消息隊列綁定或解綁到此交換機。

?我們可以看到詳細信息中,當(dāng)前交換機特性是持久化的,也就是說就算機器重啟,那么此交換機也會被保留,如果不是持久化,那么一旦重啟就會消失,實際上我們在列表中看到D字樣就是表示此交換機是持久化的,包括消息隊列也是這樣的,所有自動生成的交換機都是持久化的。

?第二個交換機是個直連的交換機。

?這個交換機和我們剛剛介紹的默認交換機類型是一致的,并且也是持久化的,但是我們可以看到它是具有綁定關(guān)系的,如果沒有指定的消息隊列綁定到此交換機上,那么這個交換機會無法正常將信息存放到指定的消息隊列中,也是根據(jù)對應(yīng)的routingKey尋找消息隊列(但是可以自定義)

?創(chuàng)建隊列。

在我創(chuàng)建隊列的選項中的auto delete的作用是: 需要至少有一個消費者連接到這個隊列,之后,一旦所有與這個隊列連接的消費斷開時,就會自動刪除此隊列。

?通過默認交換機綁定我們創(chuàng)建的隊列并將數(shù)據(jù)傳入消息隊列中,因為我們默認的交換機是自動綁定的,我們直接傳入數(shù)據(jù)。

?現(xiàn)在在queueTest中就存在一條數(shù)據(jù)了。

?獲取數(shù)據(jù)。

?在獲取數(shù)據(jù)位置有四種消息的處理方式。

1.Nack message requeue true:拒絕消息,也就是說不會將消息從消息隊列取出,并且重新排隊,一次可以拒絕多個消息。

2. Ack message requeue false:確認應(yīng)答,確認后消息會從消息隊列中移除,一次可以確認多個消息。

3.Reject message requeue true/false:也是拒絕此消息,但是可以指定是否重新排隊。

?而我們的通過綁定直接交換機也可以達到將數(shù)據(jù)傳入消息隊列中并取出的效果,我們在直接交換機中綁定隊列并發(fā)送消息到隊列中。

在對應(yīng)的隊列中獲取消息。

?刪除隊列中的所有消息。

?刪除此隊列。

?使用java操作消息隊列

導(dǎo)入對應(yīng)依賴。

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>

我們來思想一下生產(chǎn)者和消費者,首先是生產(chǎn)者,生產(chǎn)者負責(zé)將信息發(fā)送給消息隊列。

@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");try {factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}

這里我們可以直接在程序中定義并創(chuàng)建消息隊列(實際上是和我們在管理界面創(chuàng)建一樣的效果)客戶端需要通過連接創(chuàng)建一個新的通道,同一個連接下可以有很多個通道,這樣就不用創(chuàng)建很多個連接也能支持分開發(fā)送。?

@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//創(chuàng)建連接try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){ //通過Connection創(chuàng)建新的Channel//聲明隊列,如果此隊列不存在,會自動創(chuàng)建channel.queueDeclare("queueTest", false, false, false, null);//將隊列綁定到交換機channel.queueBind("queueTest", "amq.direct", "queuekey");//發(fā)布新的消息,注意消息需要轉(zhuǎn)換為byte[]channel.basicPublish("amq.direct", "queuekey", null, "Hello World!".getBytes());}catch (Exception e){e.printStackTrace();}}

其中queueDeclare方法的參數(shù)如下:

queue:隊列的名稱(默認創(chuàng)建后routingKey和隊列名稱一致)

durable:是否持久化。

exclusive:是否排他,如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。排他隊列是基于Connection可見,同一個Connection的不同Channel是可以同時訪問同一個連接創(chuàng)建的排他隊列,并且,如果一個Connection已經(jīng)聲明了一個排他隊列,其他的Connection是不允許建立同名的排他隊列的,即使該隊列是持久化的,一旦Connection關(guān)閉或者客戶端退出,該排他隊列都會自動被刪除。

autoDelete:是否自動刪除。

arguments:設(shè)置隊列的其他一些參數(shù),這里我們暫時不需要什么其他參數(shù)。

其中queueBind方法參數(shù)如下:

queue:需要綁定的隊列名稱。

exchange:需要綁定的交換機名稱。

routingKey:不用多說了吧。

其中basicPublish方法的參數(shù)如下:

exchange: 對應(yīng)的Exchange名稱,我們這里就使用第二個直連交換機。

routingKey:這里我們填寫綁定時指定的routingKey,其實和之前在管理頁面操作一樣。

props:其他的配置。

body:消息本體。

接著我們運行測試代碼,并在控制面板中測試。

消費者測試。????????

@Testvoid contextLoads() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//這里不使用try-with-resource,因為消費者是一直等待新的消息到來,然后按照//我們設(shè)定的邏輯進行處理,所以這里不能在定義完成之后就關(guān)閉連接Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建一個基本的消費者channel.basicConsume("queueTest", false, (s, delivery) -> {//delivery里面是消息的一些內(nèi)容System.out.println(new String(delivery.getBody()));//basicAck是確認應(yīng)答,第一個參數(shù)是當(dāng)前的消息標(biāo)簽,第二個的參數(shù)表示//是否批量處理消息隊列中所有的消息,如果為false表示只處理當(dāng)前消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//basicNack是拒絕應(yīng)答,最后一個參數(shù)表示是否將當(dāng)前消息放回隊列,如果//為false,那么消息就會被丟棄//channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);//跟上面一樣,最后一個參數(shù)為false,只不過這里省了//channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}, s -> {});

其中basicConsume方法參數(shù)如下:

queue - 消息隊列名稱,直接指定。

autoAck - 自動應(yīng)答,消費者從消息隊列取出數(shù)據(jù)后,需要跟服務(wù)器進行確認應(yīng)答,當(dāng)服務(wù)器收到確認后,會自動將消息刪除,如果開啟自動應(yīng)答,那么消息發(fā)出后會直接刪除。

deliver - 消息接收后的函數(shù)回調(diào),我們可以在回調(diào)中對消息進行處理,處理完成后,需要給服務(wù)器確認應(yīng)答。

cancel - 當(dāng)消費者取消訂閱時進行的函數(shù)回調(diào),這里暫時用不到。

在springBoot整合消息隊列?

導(dǎo)入對應(yīng)依賴。

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

?在配置文件中修改配置。

spring:rabbitmq:addresses: 127.0.0.1username: testpassword: 123456virtual-host: /test

我們創(chuàng)建一個配置類。

import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfiguration {@Bean("directExchange") //定義交換機Bean,可以很多個public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.build();}@Bean("binding")public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey") //使用自定義的routingKey.noargs();} }

?接下來我們來創(chuàng)建一個生產(chǎn)者,這里我們直接編寫在測試用例中:

//RabbitTemplate為我們封裝了大量的RabbitMQ操作,已經(jīng)由Starter提供,因此直接注入使用即可@ResourceRabbitTemplate template;@Testvoid publisher() {//使用convertAndSend方法一步到位,參數(shù)基本和之前是一樣的//最后一個消息本體可以是Object類型,真是大大的方便template.convertAndSend("amq.direct", "queueTest", "Hello World!");}

創(chuàng)建消費者,創(chuàng)建監(jiān)聽器。

import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊列queueTest的監(jiān)聽器,一旦監(jiān)聽到新的消息,就會接受并處理public void test(Message message){ //如果我們Message的類型改為String類型也是可以的,它會自動將我們的數(shù)據(jù)轉(zhuǎn)換為String類型System.out.println(new String(message.getBody()));} }

效果圖:

我們可以往消息隊列中提交數(shù)據(jù),然后等待消費者返回信息。

@Testvoid publisher() {//使用convertAndSend方法一步到位,參數(shù)基本和之前是一樣的//最后一個消息本體可以是Object類型,真是大大的方便Object queuekey = template.convertSendAndReceive("amq.direct", "queuekey", "Hello World!");System.out.println("收到消費者的響應(yīng)");}

消費者的響應(yīng)方式,就是通過配置監(jiān)聽器的監(jiān)聽方法的返回值來實現(xiàn)。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊列queueTest的監(jiān)聽器,一旦監(jiān)聽到新的消息,就會接受并處理public String test(String message){System.out.println(message);return "消費者已經(jīng)做出響應(yīng)";} }

進行測試。

消息隊列處理json數(shù)據(jù)?

如果需要傳入的數(shù)據(jù)為json類型時我們該怎么做呢?

在rabbitmq中配置類中將json數(shù)據(jù)轉(zhuǎn)換器注入spring中。

@Configuration public class RabbitConfiguration {@Bean("jacksonConverter") //直接創(chuàng)建一個用于JSON轉(zhuǎn)換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}

在消費者類中指定消息轉(zhuǎn)換器。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }

進行測試。

因為我們在spring注入json轉(zhuǎn)換器,所以我們可以在測試類中直接給傳入數(shù)據(jù)實現(xiàn)類,其會自動將我們的實現(xiàn)類轉(zhuǎn)換為json類型。

實現(xiàn)類:

@Data public class User {int id;String name; }

發(fā)送消息到消息隊列中的測試類。

@Testvoid publisher() {template.convertAndSend("amq.direct", "queuekey",new User());}

測試結(jié)果圖:

?死信隊列

?消息隊列中的數(shù)據(jù),如果遲遲沒有消費者來處理,那么就會一直占用消息隊列的空間,比如我們模擬一下?lián)屲嚻钡膱鼍?#xff0c;用戶下單高鐵之后,會進行搶座,然后再進行付款,但是如果用戶下單之后并沒有及時的付款,這張票不可能一直讓該用戶占用著,因為你不買別人還要買呢,所以會在一段時間后超時,讓這張票可以繼續(xù)被其他人購買。

這時,我們就可以使用死信隊列,將那些用戶超時未付款的或是用戶主動取消的訂單,進行進一步的處理,以下類型的消息都會被判定為死信。

1.消息被拒絕(basic.reject/basic.nack),并且requeue = false。

2.消息TTL過期。

3.隊列達到最大值。

?那么如何構(gòu)建這樣的一種使用模式呢?實際上本質(zhì)上就是一個死信交換機+綁定的死信隊列,當(dāng)正常隊列中的消息被判定為死信時,會被發(fā)送到對應(yīng)的死信交換機中,死信隊列也有對應(yīng)的消費者去處理消息。

這里我們直接在消息隊列配置類中創(chuàng)建一個新的死信隊列,并對其進行綁定。

@Bean("directDlExchange")public Exchange dlExchange(){//創(chuàng)建一個新的死信交換機//在這里做配置的話,即使在交換機中沒有該交換機,其也會自動被創(chuàng)建return ExchangeBuilder.directExchange("dlx.direct").build();}@Bean("testDlQueue") //創(chuàng)建一個新的死信隊列public Queue dlQueue(){return QueueBuilder.nonDurable("testDl").build();}@Bean("dlBinding") //死信交換機和死信隊列進綁定public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,@Qualifier("testDlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("queueDlKey").noargs();}

?再將該隊列綁定為其他隊列的死信隊列。

@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").build();}

修改消費者類的配置信息,將今天的隊列改為死信隊列,當(dāng)死信隊列有消息時就進行消費。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "testDl", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }

測試情況一

當(dāng)前消息隊列中存在一個消息,我們對獲取其消息并將其在放到消息隊列中,讓死信隊列處理。

可以發(fā)現(xiàn)死信隊列處理了這個消息。?

?測試情況二

在rabbitmq的配置類中進行配置,配置隊列中消息的生命周期。

@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").ttl(500).build();}

可以發(fā)現(xiàn)對消息隊列發(fā)送的消息在0.5秒后就被死信隊列消費了。?

?測試第三中情況

在rabbitmq的配置類中對消息隊列的長度進行配置。?

@Bean("queueTest") //定義消息隊列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").maxLength(3)//設(shè)置消息隊列的長度.build();}

往消息隊列中添加四組數(shù)據(jù),我們可以發(fā)現(xiàn)第一組數(shù)據(jù)被死信隊列消費了。

?工作隊列模式

?實際上這種模式就非常適合多個工人等待新的任務(wù)到來的場景,我們的任務(wù)有很多個,一個一個丟到消息隊列中,而此時個人有很多個,那么我們就可以將這些任務(wù)分配給各個工人,讓他們各自負責(zé)一些任務(wù),并且做的快的工人還可以多完成一些。(能者多勞)

我們只需要創(chuàng)建多個監(jiān)聽器即可。(這里我們就先創(chuàng)建兩個監(jiān)聽器)

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println(user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println(user);} }

在隊列中添加多個消息,觀察消費者的處理情況。

可以發(fā)現(xiàn)消費者采用的是輪番的策略,進行消息消費的。

默認情況下,一個消費者可以同時處理250個消息,我們也可以對其進行修改。

?在rabbitmq的配置類中條件監(jiān)聽器創(chuàng)建者工廠。

@Resourceprivate CachingConnectionFactory connectionFactory;@Bean(name = "listenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1); //將PrefetchCount設(shè)定為1表示一次只能取一個return factory;}

?在消費者類中設(shè)置對應(yīng)的監(jiān)聽器生產(chǎn)者。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver1(User user){System.out.println("消費者1處理消息" + user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver2(User user){System.out.println("消費者2處理消息" + user);} }

進行測試。

當(dāng)我們想同時創(chuàng)建多個功能相同的消費者時,我們只要進行下列配置即可。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", concurrency = "count")public void receiver1(User user){System.out.println("消費者1處理消息" + user);}}

測試效果圖:

發(fā)布訂閱模式

比如我們在購買了云服務(wù)器,但是最近快到期了,那么就會給你的手機和郵箱發(fā)送消息,告訴你需要續(xù)費了,但是手機短信和郵箱發(fā)送?并不是同一個業(yè)務(wù)提供的,但是現(xiàn)在我們又希望能夠都去執(zhí)行,所以就可以用到發(fā)布訂閱模式,簡而言之就是,發(fā)布一次,消費多個。

因為我們之前使用的是直連交換機,是一對一的關(guān)系,肯定是不行的,我們這里需要用到另一種類型的交換機,叫做fanout(扇出)類型,這是一種廣播類型,消息會被廣播到所有與此交換機綁定的消息隊列中。

?在rabbitmq配置類中進行配置,創(chuàng)建多個隊列,并將這些對應(yīng)的隊列綁定到扇出交換機上。

@Configuration public class RabbitConfiguration {@Resourceprivate CachingConnectionFactory connectionFactory;@Bean("fanoutExchange")public Exchange exchange(){//注意這里是fanoutExchangereturn ExchangeBuilder.fanoutExchange("amq.fanout").build();}@Bean("queueTest1") //定義消息隊列public Queue queue1(){return QueueBuilder.nonDurable("queueTest1") //非持久化類型.build();}@Bean("queueTest2") //定義消息隊列public Queue queue2(){return QueueBuilder.nonDurable("queueTest2") //非持久化類型.build();}@Bean("binding1")public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest1") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey1") //使用自定義的routingKey.noargs();}@Bean("binding2")public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest2") Queue queue){//將我們剛剛定義的交換機和隊列進行綁定return BindingBuilder.bind(queue) //綁定隊列.to(exchange) //到交換機.with("queuekey2") //使用自定義的routingKey.noargs();}@Bean("jacksonConverter") //直接創(chuàng)建一個用于JSON轉(zhuǎn)換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}

修改監(jiān)聽器。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest1", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println("隊列一接收到消息" + user);}@RabbitListener(queues = "queueTest2", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println("隊列二接收到消息" + user);} }

?測試結(jié)果圖:

在對應(yīng)的交換機中沒有指定routingKey時發(fā)送數(shù)據(jù),兩個隊列都會收到消息。

?路由模式

?我們可以在綁定時指定想要的routingKey只有生產(chǎn)者發(fā)送時指定了對應(yīng)的routingKey才能到達對應(yīng)的隊列。

?當(dāng)然除了我們之前的一次綁定之外,同一個消息隊列可以多次綁定到交換機,并且使用不同的routingKey,這樣只要滿足其中一個都可以被發(fā)送到此消息隊列中。

在rabbitmq的配置類中進行配置。

@Configuration public class RabbitConfiguration {@Bean("directExchange")public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding") //使用yyds1綁定public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key1").noargs();}@Bean("binding2") //使用yyds2綁定public Binding binding2(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key2").noargs();} }

?修改監(jiān)聽器。

@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest")public void receiver1(String message) {System.out.println("隊列一接收到消息" + message);} }

我們在交換機中添加兩條消息,分別通過不同的routingKey。

進行測試,通過不同的routingkey進入了同一個消息隊列中?。

主題模式

?實際上這種模式就是一種模糊匹配模式,我們可以將routingKey以模糊匹配的方式去進行轉(zhuǎn)發(fā)。?

?我們可以使用*或#來表示:

1.*:表示容易的一個單詞。

2.#:表示0個或多個單詞。

?修改rabbitmq的配置類。

@Configuration public class RabbitConfiguration {@Bean("topicExchange") //這里使用預(yù)置的Topic類型交換機public Exchange exchange(){return ExchangeBuilder.topicExchange("amq.topic").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("*.test.*").noargs();} }

在預(yù)設(shè)的topic交換機中以a.test.c?作為routingKey將數(shù)據(jù)傳入對應(yīng)的消息隊列中。

?消費者去消費了此消息。

"#"也是差不多的效果。

?除了我們這里使用的默認主題交換機之外,還有一個叫做amq.rabbitmq.trace的交換機。

?可以看到它也是topic類型的,那么這個交換機是做什么的呢?實際上這個是用于幫助我們記錄和追蹤生產(chǎn)者和消費者使用消息隊列的交換機,它是一個內(nèi)部的交換機。

接著我們需要在rabbitmq主機中將/test的追蹤功能開啟。

rabbitmqctl trace_on -p /test

?創(chuàng)建新的消息隊列。

?將消息隊列綁定到amq.rabbitmq.trace交換機上,要將生產(chǎn)者輸入的交換機和消費者獲取數(shù)據(jù)的隊列全部存放到剛剛那個trace消息隊列中。?

?我們獲取trace中的消息,會得到一個交換機和消息隊列。

?第四種交換機類型

?第四種交換機類型header,它是根據(jù)頭部消息來決定的,在我們發(fā)送的消息中是可以攜帶一些頭部消息的(類似于HTTP),我們可以根據(jù)這些頭部信息來決定路由哪個消息隊列中。

???????修改rabbitmq的配置類。

@Configuration public class RabbitConfiguration {@Bean("headerExchange") //注意這里返回的是HeadersExchangepublic HeadersExchange exchange(){return ExchangeBuilder.headersExchange("amq.headers") //RabbitMQ為我們預(yù)置了兩個,這里用第一個就行.build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("headerExchange") HeadersExchange exchange, //這里和上面一樣的類型@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange) //使用HeadersExchange的to方法,可以進行進一步配置//.whereAny("a", "b").exist(); 這個是只要存在任意一個指定的頭部Key就行//.whereAll("a", "b").exist(); 這個是必須存在所有指定的的頭部Key.where("test").matches("hello"); //比如我們現(xiàn)在需要消息的頭部信息中包含test,并且值為hello才能轉(zhuǎn)發(fā)給我們的消息隊列//.whereAny(Collections.singletonMap("test", "hello")).match(); 傳入Map也行,批量指定鍵值對} }

將數(shù)據(jù)傳入amq.header交換機,并設(shè)置頭部信息,進行測試。

查看queueTest隊列的消息,可以發(fā)現(xiàn)消息隊列中的消息被消費了。

docker 搭建rabbitmq集群?

下載rabbitmq的管理者版本。

docker pull rabbitmq:3.9.5-management

創(chuàng)建三個rabbitmq。(如果需要查看各自的控制面板的話,我們只需要為每個rabbitmq綁定15672端口)

docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit2 --name rabbit2 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit3 --name rabbit3 -p 15673:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management

-e RABBITMQ_ERLANG_COOKIE=‘rabbitcookie’ : 必須設(shè)置為相同,因為 Erlang節(jié)點間是通過認證Erlang cookie的方式來允許互相通信的。
–link rabbit1:myRabbit1 --link rabbit2:myRabbit2: 不要漏掉,否則會 一直處在 Cluster status of node rabbit@myRabbit3 … 沒有反應(yīng)。
啟動完成之后,使用docker ps命令查看運行情況,確保RabbitMQ都已經(jīng)啟動。

將RabbitMQ節(jié)點加入到集群。

#進入rabbitmq02容器,重新初始化一下,將02節(jié)點加入到集群中 docker exec -it rabbit2 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 #參數(shù)“--ram”表示設(shè)置為內(nèi)存節(jié)點,忽略該參數(shù)默認為磁盤節(jié)點,@后面的為ip名,以為我們在啟動rabbitmq時給其ip設(shè)置了新的名字,且我們以一節(jié)點作為主節(jié)點其他作為從節(jié)點。 rabbitmqctl start_app exit#進入rabbitmq03容器,重新初始化一下,將03節(jié)點加入到集群中 docker exec -it rabbit3 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 rabbitmqctl start_app exit

內(nèi)存節(jié)點將所有的隊列、交換器、綁定、用戶等元數(shù)據(jù)定義都存儲在內(nèi)存中;而磁盤節(jié)點將元數(shù)據(jù)存儲在磁盤中。單節(jié)點系統(tǒng)只允許磁盤類型的節(jié)點,否則當(dāng)節(jié)點重啟以后,所有的配置信息都會丟失。如果采用集群的方式,可以選擇至少配置一個節(jié)點為磁盤節(jié)點,其余部分配置為內(nèi)存節(jié)點,這樣可以獲得更快的響應(yīng)。所以本集群中配置節(jié)點一位磁盤節(jié)點,節(jié)點二和節(jié)點三位內(nèi)存節(jié)點。

?此時我只是完成了簡單的集群,接下來我們還要配置鏡像隊列(類型主從復(fù)制),我們這里在終端中配置,其實也可以直接在控制面板中在admin中配置對應(yīng)的策略。

#隨便進入一個容器 docker exec -it rabbit1 bash#設(shè)置策略匹配所有名稱的隊列都進行高可用配置,且實現(xiàn)自動同步。 rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'#查詢策略 rabbitmqctl list_policies -p / #查看vhost下的所有的策略(policies )

當(dāng)主節(jié)點中的隊列down后從節(jié)點中的隊列就會被使用,且在主節(jié)點中的隊列恢復(fù)以后,其會變成從隊列來繼續(xù)使用。

1.策略名稱,我們命名為ha(高可用);
2.-p / 設(shè)置vhost,可以使用rabbitmqctl list_policies -p / 查看該vhost 下所有的策略(policies )。
3.隊列名稱的匹配規(guī)則,使用正則表達式表示;
4.為鏡像隊列的主體規(guī)則,是json字符串,分為三個屬性:ha-mode | ha-params | ha-sync-mode,分別的解釋如下:
????????ha-mode:鏡像模式,分類:all/exactly/nodes,all存儲在所有節(jié)點;exactly存儲x個節(jié)點,節(jié)點的個數(shù)由ha-params指定;nodes指定存儲的節(jié)點上名稱,通過ha-params指定;
????????ha-params:作為參數(shù),為ha-mode的補充;
????????ha-sync-mode:鏡像消息同步方式:automatic(自動),manually(手動);

?消息隊列中間件

?由于使用不同的消息隊列,我們不能保證系統(tǒng)相同,為了注重邏輯,springCloud Stream它能夠屏蔽底層實現(xiàn),我們使用統(tǒng)一的消息隊列操作方式就能操作多種不同類型的消息隊列。

它屏蔽了Rabbitmq底層操作,讓我們使用統(tǒng)一 的Input和Output形式。以Binder為中間件,這樣我們就算切換了不同的消息隊列,也無需修改代碼,而具體某種消息隊列的底層實現(xiàn)是交給Stream在做的。?

創(chuàng)建兩個模塊,一個是生產(chǎn)者一個是消費者。

?導(dǎo)入對應(yīng)依賴。

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>3.2.4</version></dependency>

在生產(chǎn)者的配置文件中進行配置。

server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務(wù)的配置信息cloud-server: #綁定的名稱,自定義一個就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務(wù)器相關(guān)信息,以為是自定義的名稱,所以下面會爆紅,不影響運行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-out-0: #自定義的綁定名稱destination: test.exchange #目的地,就是交換機的名稱。如果不存在就會創(chuàng)建

在生產(chǎn)者的controller層編寫發(fā)送消息的controller。

@RestController public class PublisherController {@ResourceStreamBridge streamBridge;public String publish() {//第一個次數(shù)其實就是Rabbitmq的交換機名稱//這個交換機的名稱有些規(guī)則//輸入: <名稱>-in-<index>//輸出: <名稱>-out-<index>//這里我們使用輸出方式,來將數(shù)據(jù)發(fā)送到消息隊列,注意這里的名稱會和之后的消費者Bean名稱進行對應(yīng)streamBridge.send("test-out-0", "hello world");return "發(fā)送成功"+new Date();} }

編寫消費者配置文件。

server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務(wù)的配置信息cloud-server: #綁定的名稱,自定義一個就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務(wù)器相關(guān)信息,以為是自定義的名稱,所以下面會爆紅,不影響運行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-in-0:destination: test.exchange

創(chuàng)建一個類用于做消費者的配置。

@Component public class TestConsumer {@Bean("test")//這里的注入名要和剛剛生產(chǎn)者的綁定名稱中的名稱相同public Consumer<String> consumer() {return System.out::println;} }

啟動測試。

?其自動幫我們創(chuàng)建了對應(yīng)的消息隊列。

?消息發(fā)送以后,消費者去去消費了這個消息。

這樣我們就通過springCloud Stream屏蔽底層Rabbitmq來直接進行消息的操作了。??

總結(jié)

以上是生活随笔為你收集整理的springCloud 初探的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。