springCloud 初探
分布式系統(tǒng)理論
分布式系統(tǒng)是若干個(gè)獨(dú)立計(jì)算機(jī)的集合,這些計(jì)算機(jī)的集合,這些計(jì)算機(jī)對于用戶來說就像單個(gè)相關(guān)系統(tǒng)。分布式系統(tǒng)是由一組通過網(wǎng)絡(luò)進(jìn)行通信,為了完成共同的任務(wù)而協(xié)調(diào)工作的計(jì)算機(jī)節(jié)點(diǎn)組成的系統(tǒng)。分布式系統(tǒng)的出現(xiàn)是為了用廉價(jià)的,普通的機(jī)器完成單個(gè)計(jì)算機(jī)無法完成的計(jì)算,存儲(chǔ)任務(wù)。其目的是:利用更多的機(jī)器,處理更多的數(shù)據(jù)。
RPC
定義: RPC是指遠(yuǎn)程過程調(diào)用,是一種進(jìn)程間的通信方式,它是一種技術(shù)的思想,而不是規(guī)范。它允許程序調(diào)用另一個(gè)地址空間的過程或函數(shù),而不是程序顯式編碼這個(gè)遠(yuǎn)程調(diào)用的細(xì)節(jié)。即程序員無論是調(diào)用本地還是遠(yuǎn)程的函數(shù),本質(zhì)上編寫的調(diào)用代碼基本相同。
RPC原理:
?
RPC的核心:通訊, 序列化(方便數(shù)據(jù)傳輸)。
序列化:數(shù)據(jù)傳輸需要轉(zhuǎn)換。
解決這些核心問題我們可以使用Doubb。
springCloud技術(shù)概況
?springCloud技術(shù)分布圖:
springCloud升級:
springBoot和springCloud版本兼容查詢
https://spring.io/projects/spring-cloud#learn
springCloud 對應(yīng)依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>pom</type><scope>import</scope> </dependency>Eureka
?作用: Eureka能夠自動(dòng)注冊并發(fā)現(xiàn)微服務(wù),然后對服務(wù)的狀態(tài),信息進(jìn)行集中管理,這樣我們需要獲取其他服務(wù)的信息時(shí),我們只需要向Eureka進(jìn)行查詢就可以了。
這樣服務(wù)之間的強(qiáng)廣聯(lián)性就會(huì)被進(jìn)一步減弱。
?對應(yīng)依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId><version>3.1.3</version> </dependency>配置application.yml文件
eureka:client:#由于我們是作為服務(wù)端角色,所以不需要獲取服務(wù)端,改為false, 默認(rèn)為truefetch-registry: false#暫時(shí)不需要將自己注冊到eurekaregister-with-eureka: falseservice-url:defaultZone: http://localhost:8888/eureka且要在啟動(dòng)類中添加注解:?@EnableEurekaServer?
效果圖:
?接下來將各個(gè)服務(wù)作為客戶端。
作為客戶端的對應(yīng)依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version> </dependency>配置各個(gè)服務(wù)的application.yml,讓將服務(wù)地址指向eureka服務(wù)的地址,這樣才能實(shí)現(xiàn)注冊。?
客戶端application.yml配置:
eureka:client:service-url:defaultZone: http://localhost:8888/eurekaspring:application:name: ”對應(yīng)名字“效果圖:
當(dāng)我們的服務(wù)啟動(dòng)之后,每隔一段時(shí)間eureka會(huì)發(fā)送一次心跳包,這樣eureka就能檢測到我們的服務(wù)是否還在正常運(yùn)行。
通過eureka來調(diào)用服務(wù)?
例子一:
舊代碼
public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate();DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}這里其實(shí)就是通過詢問eureka對應(yīng)的服務(wù)名來獲取對應(yīng)的ip地址。?
?負(fù)載均衡的實(shí)現(xiàn)
? ? 同一個(gè)服務(wù)器實(shí)際上可以注冊很多個(gè)的, 但它們的端口是不同的,比如我們創(chuàng)建多個(gè)用戶查詢服務(wù),將原有的端口進(jìn)行修改,由idea中設(shè)置啟動(dòng)參數(shù)來決定,這樣就可以創(chuàng)建幾個(gè)同端口的相同服務(wù)了。?
效果圖中說明在用戶服務(wù)處有多個(gè)相同的服務(wù)。?
?當(dāng)我們要使用用戶服務(wù)時(shí),如果有第一個(gè)用戶服務(wù)down掉的話,就會(huì)有另一個(gè)用戶服務(wù)來執(zhí)行對應(yīng)的操作,防止整哥微服務(wù)不可用,大大提高了安全性。
當(dāng)存在多個(gè)相同服務(wù)的時(shí)候就會(huì)通過對應(yīng)的負(fù)載均衡的策略使每個(gè)服務(wù)都被調(diào)用起來,從而實(shí)現(xiàn)負(fù)載均衡。
新的實(shí)現(xiàn)代碼
1.在condig包中創(chuàng)建一個(gè)BeanConfiguration.java。
@Configuration public class BeanConfiguration {@Bean//負(fù)載均衡@LoadBalancedpublic RestTemplate restTemplate(){return new RestTemplate();} }2.在對應(yīng)的service層中的使用@Autowired進(jìn)行自動(dòng)注入使用,將原來的ip地址改為在eureka中對應(yīng)的服務(wù)名字。
@Resource private RestTemplate template;public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); //將原來的ip地址改為在eureka中對應(yīng)的服務(wù)名字DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);books.add(i, book);}userBook.setBook(books);return userBook;}注冊中心高可用
?為了防止eureka down掉,我們可以搭建eureka集群。
效果圖:
?搭建eureka集群步驟:
1.修改兩個(gè)eureka服務(wù)端的配置文件。
applicationn01.yml
server:port: 9999 eureka:instance:#由于不支持多個(gè)localhost的eureka的服務(wù)器,但是又只能在本地測試,所有就只能自定義主機(jī)名稱了hostname: eureka01client:#不需要獲取服務(wù)端fetch-registry: false#去掉register-with-eureka選項(xiàng),讓eureka服務(wù)器自己注冊到其他的eureka服務(wù)器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務(wù)器地址,不用寫自己的defaultZone: http://eureka02:9999/erekaapplication02.yml
server:port: 9999 eureka:instance:#由于不支持多個(gè)localhost的eureka的服務(wù)器,但是又只能在本地測試,所有就只能自定義主機(jī)名稱了hostname: eureka02client:#不需要獲取服務(wù)端fetch-registry: false#去掉register-with-eureka選項(xiàng),讓eureka服務(wù)器自己注冊到其他的eureka服務(wù)器,這樣才能相互啟用service-url:#注意這里要填寫其他的eureka服務(wù)器地址,不用寫自己的defaultZone: http://eureka01:8888/ereka2.啟動(dòng)eureka集群
在因?yàn)槭潜镜販y試所以我們要修改本地的hosts。
eureka01:?
eureka02:?
3.把所有的微服務(wù)在eureka集群中都注冊一次
service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka?在一個(gè)eureka down掉的時(shí)候,另一個(gè)eureka還會(huì)繼續(xù)工作,這時(shí)我們就可以對應(yīng)down 掉的eureka進(jìn)行維修,這樣就實(shí)現(xiàn)了高可用。
OpenFeign
? ? OpenFeign和RestTemplate一樣,也是一種HTTP客戶端請求工具,但它使用起來更加便捷。
對應(yīng)依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.3</version> </dependency>在對應(yīng)的啟動(dòng)類上加上 @EnableFeignClients?
配對應(yīng)的FeignClient服務(wù)接口?。(可以單獨(dú)創(chuàng)建一個(gè)包來存放這些服務(wù)接口)
服務(wù)接口格式:
@FeignClient("在eureka中配置的服務(wù)名字") public interface BookClient { //該接口里面的方法為你要調(diào)用的Controller中的方法,這些方法的路徑要寫全@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }?要使用此接口時(shí)可以通過@Resource進(jìn)行注入。(類似Dao層的mybatis,其通過@FeignClient將接口注入到spring中)
?調(diào)用此接口的Service層中的代碼就要修改為下:
public UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid); // DbUser user = template.getForObject("http://user-service/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://book-service/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}OpenFeign服務(wù)降級
為對應(yīng)的client接口創(chuàng)建實(shí)現(xiàn)類。
@Component //注入到spring中,使得BookClient能調(diào)用到此實(shí)現(xiàn)類 public class BookFallBackClient implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} } @Component 注入到spring中,使得UserClient能調(diào)用到此實(shí)現(xiàn)類 public class UserFallBackClient implements UserClient{@Overridepublic DbUser queryById(Integer id) {return new DbUser();} }?將此實(shí)現(xiàn)類添加到client的備選方案中。
?在application.xml配置熔斷支持
feign:circuitbreaker:enabled: true?效果圖:
Hystrix??
??Hystrix服務(wù)熔斷
? ? 微服務(wù)之間是可以相互調(diào)用的。?
由于位于最底端的服務(wù)提供者發(fā)生了故障,那么此時(shí)會(huì)直接導(dǎo)致ABCD全線崩潰,就像雪崩一樣。
這種情況實(shí)際上是不可避免的,由于太多的因素,比如網(wǎng)絡(luò)故障,系統(tǒng)故障,硬件問題,會(huì)導(dǎo)致這種極端的情況發(fā)生,因此我們需要找到對應(yīng)的方法來解決次問題。
為了解決分布式系統(tǒng)的雪崩問題,springCloud提供了Hystrix熔斷組件,它就像我們家中的保險(xiǎn)絲一樣,當(dāng)電流過載的時(shí)候直接熔斷掉,防止危險(xiǎn)的進(jìn)一步發(fā)生,從而保障家庭用電的安全,可以想象一下,如果整條鏈路上的服務(wù)已經(jīng)全線崩潰,這時(shí)還在不斷地有大量的請求到達(dá),想要各個(gè)服務(wù)進(jìn)行處理,肯定是會(huì)使得情況越來越糟。
熔斷機(jī)制是應(yīng)對雪崩效應(yīng)的一種微服務(wù)鏈路保護(hù)機(jī)制,當(dāng)檢測到鏈路的某個(gè)微服務(wù)不可用或者響應(yīng)的時(shí)間太長時(shí),會(huì)進(jìn)行服務(wù)降級,進(jìn)而熔斷該節(jié)點(diǎn)微服務(wù)的調(diào)用,快速返回"錯(cuò)誤"的響應(yīng)信息,當(dāng)檢測到該節(jié)點(diǎn)微服務(wù)響應(yīng)正常后恢復(fù)調(diào)用鏈路。
實(shí)際上,熔斷就是在降級的基礎(chǔ)上進(jìn)一步形成的,也就是說,在一段時(shí)間內(nèi)多次調(diào)用失敗,那么就直接升級為熔斷。
?當(dāng)需要的服務(wù)正常啟動(dòng)后,熔斷機(jī)制就會(huì)關(guān)閉了。
Hystrix服務(wù)降級
? ? 服務(wù)降級并不會(huì)直接返回錯(cuò)誤信息,而是可以提供一個(gè)補(bǔ)救的措施,正常響應(yīng)給請求者,這樣相當(dāng)于服務(wù)依然可用,但是服務(wù)能力肯定是下降的。?
對應(yīng)依賴:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId><version>2.2.10.RELEASE</version> </dependency>在啟動(dòng)類上添加注解: @EnableHystrix。
?
?在對應(yīng)的Controller層中編寫編寫備選方案。
//備選方案,返回一個(gè)空的對象public ResponseEntity<UserBook> onError(Integer uid) {UserBook userBook = new UserBook();userBook.setUser(null);userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}在對的方法上添加注解:@HystrixCommand(fallbackMethod = "備選方案名")
?效果圖:?
Gateway路由網(wǎng)關(guān)?
可能并不是所有的微服務(wù)都需要直接暴露給外部調(diào)用,這時(shí)我們就可以使用路由機(jī)制,添加一層防護(hù),讓所有的請求全部通過路由來轉(zhuǎn)發(fā)到各個(gè)微服務(wù),并且轉(zhuǎn)發(fā)給多個(gè)相同微服務(wù)實(shí)例也可以實(shí)現(xiàn)負(fù)載均衡。
?部署網(wǎng)關(guān)
對應(yīng)依賴為下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId><version>3.1.2</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId><version>3.1.3</version></dependency>第一個(gè)是網(wǎng)關(guān)的依賴,第二個(gè)是跟其他微服務(wù)一樣,需要注冊到eureka中才能生效,不要添加web依賴,使用的是WebFlux框架。
編寫配置文件
eureka:client:service-url:defaultZone: http://localhost:8888/eureka, http://localhost:9999/eureka spring:application:name: gateway server:port: 8500?繼續(xù)在配置文件中配置路由功能。
spring:cloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負(fù)載均衡到微服務(wù),也可以使用Http正常轉(zhuǎn)發(fā)predicates: #路由規(guī)則 斷言什么請求會(huì)被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個(gè)路徑,一律都被路由到上面指定的服務(wù)?在輸入路徑后,如果路徑符合斷言,就會(huì)將uri和Path進(jìn)行拼接。
路由過濾器
路由過濾器支持以某中方式修改傳入的HTTP請求或傳出的HTTP響應(yīng),路由過濾器的范圍是某個(gè)過濾器,跟之前的斷言一樣
修改對應(yīng)的配置文件。
spring:application:name: gatewaycloud:gateway:routes:- id: borrowService #路由的名字uri: lb://borrow-service #路由的地址,lb表示使用負(fù)載均衡到微服務(wù),也可以使用Http正常轉(zhuǎn)發(fā)predicates: #路由規(guī)則 斷言什么請求會(huì)被路由- Path=/dbBorrow/queryUserBook/** #只要訪問這個(gè)路徑,一律都被路由到上面指定的服務(wù)- id: bookServiceuri: lb://book-servicepredicates:- Path=/dbBook/**filters: #添加過濾器- AddRequestHeader=Test, HELLO WORLD!?在對應(yīng)的Controller層中測試是否過濾成功。
效果圖:
?
設(shè)置全局過濾器
?例子:攔截沒有攜帶指定參數(shù)的請求。
在gateway項(xiàng)目中創(chuàng)建一個(gè)實(shí)現(xiàn)類。
@Component public class TestFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數(shù)System.out.println(request.getQueryParams());//判斷是否包含test參數(shù),且參數(shù)值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應(yīng)return exchange.getResponse().setComplete();}} }次過濾器會(huì)作用于整個(gè)網(wǎng)關(guān)。
效果圖:
只要路徑中沒有攜帶test=1就會(huì)被攔截。
當(dāng)然過濾器會(huì)存在很多個(gè)的,所以我們手動(dòng)指定過濾器之間的順序。可以通過實(shí)現(xiàn)Ordered接口,重寫getOredr方法來設(shè)置執(zhí)行的順序。
@Component public class TestFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // exchange表示請求過來的信息//先獲取ServerHttpRequest對象,注意不是HttpServletRequestServerHttpRequest request = exchange.getRequest();//打印請求攜帶的所有參數(shù)System.out.println(request.getQueryParams());//判斷是否包含test參數(shù),且參數(shù)值是否為1List<String> test = request.getQueryParams().get("test");if(test != null && test.contains("1")){//將ServerHttpExchange向過濾鏈的下一級傳遞,類似javaWeb中的過濾器return chain.filter(exchange);} else {//直接在這里不再向下傳遞,然后返回響應(yīng)return exchange.getResponse().setComplete();}}@Overridepublic int getOrder() {//返回的數(shù)字表示執(zhí)行的順序 return 0;} }注意Oreder的值越小執(zhí)行的優(yōu)先級就越高,并且無論是配置文件里編寫的單個(gè)路由過濾器還是全局過濾器,都會(huì)受到Order的影響(單個(gè)路由過濾器的Order值按從上到下的順序從1開始遞增),最終是按照Order值決定哪個(gè)路由過濾器先執(zhí)行,當(dāng)Order值相同時(shí),全局路由過濾器會(huì)優(yōu)先于單個(gè)路由過濾器執(zhí)行。?
微服務(wù)CAP原則
在一個(gè)分布式系統(tǒng)中存在 Consistency(一致性),Availabiity(可用性),Partition Tolerance(區(qū)分容錯(cuò)性)三者是不可同時(shí)保證的,最多同時(shí)保證其中的兩個(gè)。
?一致性:在分布式系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻都是相同的值。(所有的節(jié)點(diǎn)無論何時(shí)訪問都能拿到最新的值)
可用性:系統(tǒng)中非故障節(jié)點(diǎn)收到的每個(gè)都必須得到響應(yīng)。(不如我們之前使用的服務(wù)降級和熔斷,其實(shí)就是一種維持可用性的措施,雖然服務(wù)器返回的是無意義的數(shù)據(jù),但不至于用戶的請求會(huì)被服務(wù)器忽略)
區(qū)分容錯(cuò)性:一個(gè)分布式系統(tǒng)里面,節(jié)點(diǎn)之間組成的網(wǎng)絡(luò)本應(yīng)該是相互連通的,然而可能因?yàn)橐恍┕收?#xff08;比如網(wǎng)絡(luò)丟包等,這是很難避免的),使得一些節(jié)點(diǎn)之間不能連通,整個(gè)網(wǎng)絡(luò)分成了幾塊區(qū)域,數(shù)據(jù)就散落在這些不相連通的區(qū)域里面。(這樣就可能出現(xiàn)某些被分區(qū)節(jié)點(diǎn)存放的數(shù)據(jù)訪問失敗,我們需要來容忍這些不可靠的情況)
總的來說,數(shù)據(jù)存放的節(jié)點(diǎn)越多,分區(qū)容忍性就越高,都是要復(fù)制更新的次數(shù)就會(huì)越來越多,同時(shí)為了保證一致性,更新所有節(jié)點(diǎn)數(shù)據(jù)所需要的時(shí)間就越長,那么可用性就會(huì)降低。
所以存在三種方案:
??
springCloud Alibaba的使用??
? ? springCloud的缺點(diǎn):
1.springCloud部分組件停止維護(hù)和更新了,給開發(fā)帶來了不便。
2.springCloud部分開發(fā)環(huán)境復(fù)雜,沒有完善的可視化界面,我們需要大量的二次開發(fā)和定制。
3.springCloud配置復(fù)雜,難上手,部分配置差別難以群分和合理應(yīng)用。
? ? springCloud Alibaba的優(yōu)點(diǎn):阿里使用過的組件經(jīng)歷了考驗(yàn),性能強(qiáng)悍,設(shè)計(jì)合理,現(xiàn)在開源出來給大家使用成套的產(chǎn)品搭配完善的可視化界面給開發(fā)帶來運(yùn)維帶來了極大的便利,搭建簡單,學(xué)習(xí)曲線低。
spring-cloud-alibaba對應(yīng)依賴:
<dependencyManagement><dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.7.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies> </dependencyManagement>?Nacos
Nacos: 一個(gè)易于使用的動(dòng)態(tài)服務(wù)發(fā)現(xiàn)、配置和服務(wù)管理平臺(tái),用于構(gòu)建云原生應(yīng)用程序。
作用: 將其作為服務(wù)注冊中心。
?Nacos作為服務(wù)注冊中心對應(yīng)依賴:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>在github中下載nacos并將其復(fù)制到項(xiàng)目中。
在startup.sh中設(shè)置nacos讓其在前臺(tái)運(yùn)行。?
配置服務(wù)。?
直接使用nacos。
后臺(tái)啟動(dòng)nacos:?bash nacos/bin/startup.sh。(以集群的方式)?
關(guān)閉nacos: bash nacos/bin/shutdown.sh。
后臺(tái)啟動(dòng)naocs: bash nacos/bin/startup.sh -m standalone。(以單例的方式啟動(dòng))
nacos 的地址為: localhost:8084/nacos。
?導(dǎo)入對應(yīng)依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.1</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2021.0.1.0</version><type>pom</type><scope>import</scope></dependency>在子項(xiàng)目中添加依賴。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2021.1</version></dependency>設(shè)置配置文件。
spring:cloud: #配置nacos注冊中心地址nacos:discovery:server-addr: lcoalhost:8848效果圖:
?使用openFeign調(diào)用服務(wù)。
對應(yīng)依賴為下:
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-loadbalancer</artifactId><version>3.1.1</version></dependency>和原生的調(diào)用方式一樣。
臨時(shí)實(shí)例和非臨時(shí)實(shí)例的區(qū)別:
臨時(shí)實(shí)例:和eureka一樣,采用心跳機(jī)制向nacos發(fā)送請求保持在線的狀態(tài),一但心跳停止,就代表實(shí)例下線,不保留實(shí)例信息。
非臨時(shí)實(shí)例:由nacos主動(dòng)進(jìn)行聯(lián)系,如果連接失敗,那么不會(huì)刪除實(shí)例信息,而是將健康狀態(tài)設(shè)置為false,相當(dāng)于對某個(gè)實(shí)例狀態(tài)持續(xù)進(jìn)行監(jiān)控。
?設(shè)置非臨時(shí)實(shí)例
設(shè)置配置文件。
cloud:nacos:discovery: # 修改為false 表示其是個(gè)非臨時(shí)文件ephemeral: false?對應(yīng)的實(shí)例下線時(shí)會(huì)顏色會(huì)變?yōu)榧t色。
?集群分區(qū)
對應(yīng)依賴為下:
spring:cloud:nacos:discovery:#對應(yīng)的集群名cluster-name: name效果圖
在默認(rèn)情況下,集群間的調(diào)用方式采用的是輪番調(diào)用,使用為了實(shí)現(xiàn)就近原則,我們要對配置文件進(jìn)行相應(yīng)配置。
spring:cloud:#將loadbalancer的nacos支持開啟,集成nacos負(fù)載均衡loadbalancer:nacos:enabled: true?在同個(gè)集群中如果存在多個(gè)相同的服務(wù)時(shí),就會(huì)根據(jù)權(quán)重來執(zhí)行對應(yīng)的服務(wù),我們可以在nacos頁面中進(jìn)行設(shè)置。
也可以通過配置文件進(jìn)行修改。
spring:cloud:nacos:discovery:#設(shè)置權(quán)重,默認(rèn)為1weight: 2?配置中心
我們可以通過配置來加載遠(yuǎn)程配置,這樣我們可以遠(yuǎn)端集中管理配置文件。
我們可以通過在nacos中,點(diǎn)擊新建配置項(xiàng),進(jìn)行配置。
點(diǎn)擊發(fā)布。
在項(xiàng)目中添加依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId><version>3.1.2</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>2021.1</version></dependency>編寫bootstrap.yml文件。
spring:application:#去遠(yuǎn)程倉庫中調(diào)用對應(yīng)名字的配置name: user-serviceprofiles:active: devcloud:nacos:config:file-extension: ymlserver-addr: localhost:8848此時(shí)兩個(gè)yml文件都會(huì)被使用。
?在默認(rèn)情況下,在nacos中修改yml文件,對應(yīng)的服務(wù)中的yml的信息并不會(huì)發(fā)生對應(yīng)的改變,雖然對其做了監(jiān)聽。
為了能夠保證在nacos中修改的yml文件立刻生效,我們需要添加注解@RefreshScope。
?效果圖:
原配置文件?
?修改后的配置文件
?結(jié)果
命名空間?
?新建命名空間。
在配置文件中配置所屬的命名空間。
spring:cloud:nacos:discovery:namespace: 對應(yīng)命名空間的id命名空間不相同的服務(wù)是不能相互調(diào)用的。
指定分組
?修改配置文件。
spring:cloud:nacos:discovery:group: 對應(yīng)的組名默認(rèn)為?DEFAULT_GROUP。?
高可用
?在本地?cái)?shù)據(jù)庫導(dǎo)入nacos中的sql文件。
?配置conf/application.properties文件。
?將cluster.conf.example重命名為cluster.conf,并對其內(nèi)容進(jìn)行修改,輸入多個(gè)的nanos地址,這里使用內(nèi)網(wǎng)映射。
?因?yàn)橐獎(jiǎng)?chuàng)建nacos集群,所以我們要?jiǎng)?chuàng)建多個(gè)nacos,通過復(fù)制修改好的nacos進(jìn)行操作,這時(shí)我們只要再對每個(gè)nacos修改端口號就可以了。(要以集群的方式啟動(dòng),可能會(huì)啟動(dòng)失敗,多啟動(dòng)幾次)
效果圖:?
我們需要一個(gè)負(fù)載均衡器來管理這些nacos,這里我們使用Nginx。
在Mac上安裝Nginx。
brew install nginx編輯Nginx。
nano /usr/local/etc/nginx/nginx.conf編輯內(nèi)容:
# 添加我們剛創(chuàng)建好的nacos服務(wù)器 upstream nacos-server {#被代理的服務(wù)群,nacos-server服務(wù)群的名字server localhost:8801;server localhost:8802; }server {#監(jiān)聽的端口listen 80;#服務(wù)名server_name localhost;#類似過濾器,當(dāng)訪問到符合此要求的路徑時(shí)就會(huì)去訪問對應(yīng)的服務(wù)群location /nacos {proxy_pass http://nacos-server;} }重啟Nginx。
brew services restart nginx將各個(gè)微服務(wù)的nacos的注冊地址改為localhost:80/nacos。(其會(huì)實(shí)現(xiàn)負(fù)載均衡)
在云服務(wù)器上做反向代理的例子。
1.配置nacos的端口號和對應(yīng)的數(shù)據(jù)庫。
2.啟動(dòng)倆個(gè)nacos。
3.在Nginx中配置反向代理。
集群效果:
在每個(gè)微服務(wù)中將nacos的地址改為云服務(wù)器上Nginx反向代理的地址。
??
最終效果圖:
?Sentinel流量防衛(wèi)兵
?Sentinel 可以做到熔斷和降級,可以取代Hystrix。
Sentinel具有以下功能:
- 豐富的適用場景:哨兵在阿里巴巴被廣泛使用,在過去10年中,幾乎涵蓋了Double-11(11.11)購物節(jié)的所有核心場景,例如需要限制突發(fā)流量以滿足系統(tǒng)容量的“第二次殺戮”,消息峰值剪切和山谷填充,不可靠的下游服務(wù)的斷路,集群流量控制等。
- 實(shí)時(shí)監(jiān)控:哨兵還提供實(shí)時(shí)監(jiān)控功能。您可以實(shí)時(shí)查看一臺(tái)機(jī)器的運(yùn)行時(shí)信息,以及少于500個(gè)節(jié)點(diǎn)的集群的聚合運(yùn)行時(shí)信息。
- 廣泛的開源生態(tài)系統(tǒng):Sentinel提供與Spring Cloud、Dubbo和gRPC等常用框架和庫的開箱即用集成。您只需將適配器依賴項(xiàng)添加到您的服務(wù)中,即可輕松使用Sentinel。
- Polyglot支持:Sentinel為Java、Go和C++提供了原生支持。
- 各種SPI擴(kuò)展:Sentinel提供易于使用的SPI擴(kuò)展接口,允許您快速自定義邏輯,例如自定義規(guī)則管理、調(diào)整數(shù)據(jù)源等。
下載對應(yīng)的jar包。
sentinel V1.8.3
?將jar導(dǎo)入到項(xiàng)目,為其創(chuàng)建一個(gè)服務(wù)。
啟動(dòng)項(xiàng)目。?
?添加對應(yīng)的依賴。
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-sentinel</artifactId><version>2021.0.1.0</version></dependency>對微服務(wù)中application.yml進(jìn)行配置。(實(shí)際上Sentinel是本地在管理,但我們可以連接到監(jiān)控頁面,這樣可以圖形化操作了)
spring:cloud:sentinel: # 添加監(jiān)控頁面地址transport:dashboard: localhost:8858為了提高可讀性和節(jié)省空間,Sentinel只監(jiān)視被調(diào)用過的微服務(wù)。所以為了讓sentinel監(jiān)視我們的微服務(wù),我們需要手動(dòng)的調(diào)用一次微服務(wù)。
效果圖:
流量控制
我們的機(jī)器不可能無限的接受和處理客戶端的請求,如果不加以限制,當(dāng)發(fā)生高并發(fā)時(shí),就會(huì)使得系統(tǒng)的資源很快的被耗盡。為了避免這種情況,我們可以添加流量控制,當(dāng)一段時(shí)間內(nèi)的流量達(dá)到一定的閥值的時(shí)候,新的請求將不會(huì)再進(jìn)行處理,這樣不僅可以合理地應(yīng)對高并發(fā)的情況,同時(shí)也可以在一定的程度上保護(hù)服務(wù)器不受到外界的攻擊。
解決方案
?針對判斷是否超過流量的閥值的四種算法
1.漏桶算法
?2.令牌桶算法(有點(diǎn)像游戲里的能量條機(jī)制)
?3.固定時(shí)間窗口算法
4.滑動(dòng)時(shí)間窗口算法
通過Sentinel進(jìn)行設(shè)置
?閥值類型:QPS就是每秒中的請求數(shù)量,并發(fā)線程數(shù)是按服務(wù)當(dāng)時(shí)使用的線程數(shù)據(jù)進(jìn)行統(tǒng)計(jì)的。
流控模式:當(dāng)達(dá)到閥值時(shí),流控的對象,這里暫時(shí)只用直接。
流控效果:就是我們上面所說的三種方案。
流控模式的區(qū)別:
1.直接:只針對當(dāng)前接口。
2.關(guān)聯(lián):當(dāng)其他接口超過閥值時(shí),會(huì)導(dǎo)致當(dāng)前接口被限流。(別的接口出現(xiàn)問題由當(dāng)前接口承擔(dān)責(zé)任)
3.鏈路:更細(xì)粒度的限流,能精確到具體的方法。
鏈路模式能夠更加精準(zhǔn)的進(jìn)行流量控制,鏈路流控模式指的是,當(dāng)從指定接口過來的資源請求達(dá)到限流條件時(shí),開啟限流,這里得先講解一下@SentinelResource的使用。
我們可以對某個(gè)方法進(jìn)行限流控制,無論是誰在何處調(diào)用了它,這里需要使用到@SentinelResource,一但方法被標(biāo)注,那么就會(huì)進(jìn)行監(jiān)控。
在調(diào)用的方法上添加注解@SentinelResource。
@SentinelResource("detail")@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}修改配置文件。
spring:cloud:sentinel:#關(guān)閉Context收斂,這樣被監(jiān)控方法可以進(jìn)行不同鏈路的單獨(dú)監(jiān)控web-context-unify: false運(yùn)行查看效果。
?對精確限流設(shè)置限流策略。
?設(shè)置閥值為1,然后進(jìn)行連續(xù)訪問測試,發(fā)現(xiàn)精確限流的位置拋出了異常。
?那么這個(gè)鏈路選項(xiàng)實(shí)際上就是決定只限流從哪個(gè)方向來的調(diào)用,比如我們只對borrow2這個(gè)接口對queryByUid方法的調(diào)用進(jìn)行限流,那么我們就可以為其制定鏈路。
?入口資源的設(shè)置表示:進(jìn)行精確限流的路徑,如果設(shè)置了入口資源,那么其他路徑調(diào)用被精確限流的方法時(shí)則不會(huì)被限流。
系統(tǒng)保護(hù)規(guī)則
?檢測對應(yīng)的設(shè)備來進(jìn)行限流。
限流和異常處理?
我們看到被限流之后返回的Sentinel默認(rèn)的數(shù)據(jù),其實(shí)我們可以返回我們自己定義的數(shù)據(jù)。?
?這里我們先創(chuàng)建好被限流狀態(tài)下需要返回的內(nèi)容,自定義一個(gè)請求映射。
?在要返回自定義的微服務(wù)的Controller中添加自定義的錯(cuò)誤頁面。
@RequestMapping("/blocked")public JSONObject blocked() {JSONObject json = new JSONObject();json.put("code", 403);json.put("success", false);json.put("message", "訪問頻率過快, 請稍后訪問!");return json;}在配置文件中配置限流頁面返回信息。
spring:cloud:sentinel:block-page: /dbUser/blocked效果圖:?
?對于方法級別的限流,當(dāng)某個(gè)方法被限流時(shí),會(huì)直接在后臺(tái)拋出異常,那么這種情況可以通過Sentinel添加一個(gè)替代方案,這樣當(dāng)我們發(fā)現(xiàn)異常時(shí)會(huì)直接執(zhí)行我們的代替方案并返回。
在service層中添加代替方案。
@SentinelResource(value = "detail", blockHandler = "blocked")//限流之后代替返回的其他方案,這樣就不會(huì)使用默認(rèn)的拋出異常的方式了@Overridepublic UserBook queryByUid(Integer uid) {List<DbBorrow> dbBorrows = dbBorrowDao.queryByUid(uid);RestTemplate template = new RestTemplate(); // DbUser user = template.getForObject("http://localhost:8083/dbUser/" + uid, DbUser.class);DbUser user = userClient.queryById(uid);UserBook userBook = new UserBook();userBook.setUser(user);LinkedList<DbBook> books = new LinkedList<>();for(int i = 0; i < dbBorrows.size(); i++){ // DbBook book = template.getForObject("http://localhost:8081/dbBook/" + dbBorrows.get(i).getBid(), DbBook.class);DbBook book = bookClient.queryById(dbBorrows.get(i).getBid());books.add(i, book);}userBook.setBook(books);return userBook;}//代替方案public UserBook blocked(Integer uid, BlockException blockException) {UserBook userBook = new UserBook();userBook.setBook(Collections.emptyList());userBook.setUser(new DbUser());return userBook;}blockHandler只會(huì)處理限流的異常,而不會(huì)處理方法體內(nèi)的其他代碼異常。
如果要處理限流以外的其他異常,我們可以通過其他參數(shù)進(jìn)行處理。
@RequestMapping("/test")@SentinelResource(value = "test",fallback = "except", //fallback指定出現(xiàn)異常是的替代方案exceptionsToIgnore = IOException.class) //忽略注定的異常,也就是出現(xiàn)這些指定的異常時(shí)不回調(diào)用的替代方案public String test() {throw new RuntimeException("拋出異常");}public String except(Throwable t) {return t.getMessage();}效果圖:?
?當(dāng)在@SentinelResource中同時(shí)存在 fallback和blockHandler時(shí),在拋出限流異常范圍內(nèi)的異常的時(shí)候就先調(diào)用blockHandler中的替代方案,其他的時(shí)候就會(huì)調(diào)用fallback。(注意在兩個(gè)都打存在的時(shí)候,因?yàn)橄蘖鲿?huì)在方法執(zhí)行前調(diào)用,所以在限流代替方案執(zhí)行完以后還會(huì)在執(zhí)行出現(xiàn)其異常時(shí)的代替方案)
熱點(diǎn)參數(shù)限流
我們可以對某一熱點(diǎn)進(jìn)行精準(zhǔn)限流,比如在某一時(shí)刻,不同參數(shù)被攜帶訪問的頻率是不一樣的:
http://localhost:8082/test?a=10 訪問100次
http://localhost:8082/test?b=10 訪問0次
http://localhost:8082/test?c=10 訪問3次?
由于攜帶的參數(shù)a的請求比較多,我們就可以只對攜帶參數(shù)a的請求進(jìn)行限流。?
創(chuàng)建一個(gè)新的測試請求映射。
@RequestMapping("/test")@SentinelResource("test") //注意這里需要添加@SentinelResource才可以,用戶資源名稱就使用這里定義的資源名稱public String findBorrow(@RequestParam(value = "a", required = false) String a,@RequestParam(value = "b", required = false) String b,@RequestParam(value = "c", required = false) String c) {return "請求成功!" + "a = " + a + " b = " + b + " c = " + c;}在Senntinel中設(shè)置熱點(diǎn)配置。(我們對a進(jìn)行了限流)
?效果圖:
?我們也可以對某個(gè)參數(shù)的特定值進(jìn)行特定限流。(我們對a進(jìn)行特定值限流)?
效果圖:
Sentinel的服務(wù)熔斷和降級
為了防止鏈路故障,我們能進(jìn)行隔離,這里我們有兩種隔離方案。
1.線程池隔離
線程池隔離實(shí)際上就是對每個(gè)服務(wù)的遠(yuǎn)程調(diào)用單獨(dú)開放線程池,比如服務(wù)A要調(diào)用服務(wù)B,那么只基于固定數(shù)量的線程池,這樣即使在短時(shí)間內(nèi)出現(xiàn)大量請求,由于沒有線程可以分配,所以就不會(huì)導(dǎo)致資源耗盡。?
?2.信號量隔離
信號量隔離是使用Semaphore類實(shí)現(xiàn)的,思想基本跟上面的相同,也是限定指定的線程數(shù)量能夠同時(shí)進(jìn)行服務(wù)調(diào)用,但它相對于線程池開銷會(huì)更小一些,使用效果同樣優(yōu)秀,也支持超時(shí)等,Sentinel就是采用這個(gè)方案進(jìn)行隔離的。?
說回我們的熔斷與降級,當(dāng)下游的服務(wù)因?yàn)槟承┰蜃兊貌豢捎没蝽憫?yīng)過慢時(shí),上游為了保證自己整體的高可用性,不再繼續(xù)調(diào)用目標(biāo)服務(wù)而是快速返回或執(zhí)行自己的代替方案。?
?整個(gè)過程分為三個(gè)狀態(tài):
1.關(guān)閉:熔斷器不工作,所有的請求全部該干嘛就干嘛。
2.開啟:熔斷器工作,所有的請求一律降級。
3.半開:嘗試進(jìn)行一下正常的流程,要是還是不行就繼續(xù)保持開啟的狀態(tài),否則關(guān)閉。
?在Sentinel設(shè)置熔斷規(guī)則
熔斷策略
1.慢調(diào)用比例:如果出現(xiàn)那種半天都處理不完的調(diào)用,有可能就是服務(wù)出現(xiàn)故障,這個(gè)選項(xiàng)是按照最大效應(yīng)時(shí)間(RT)進(jìn)行判斷的,如果一次請求的處理時(shí)間超過了指定的RT,那么就會(huì)判斷為慢調(diào)用,在一個(gè)統(tǒng)計(jì)時(shí)長內(nèi),如果請求數(shù)目大于最小請求數(shù)目,并且被判斷定為慢調(diào)用的請求比例已經(jīng)超過閥值,將觸發(fā)熔斷,經(jīng)過熔斷時(shí)長之后,將會(huì)進(jìn)入到半開狀態(tài)進(jìn)行試探。(這里和Hystrix一致)
?測試代碼
@RequestMapping("/borrow2/{uid}")public String test2(@PathVariable("uid") Integer uid) throws InterruptedException {Thread.sleep(1000);return "hello World!";}在在對應(yīng)的微服務(wù)中設(shè)置熔斷設(shè)置。
?效果圖:
?2.異常比例:與慢調(diào)用比例相似,不過這里判斷的是出現(xiàn)異常的次數(shù)。
測試代碼
@RequestMapping("/borrow3/{uid}")public String test3(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}在Sentinel中配置熔斷配置。
效果圖:
3.異常數(shù):很異常比例好像,但有明確指出異常數(shù)量。
降級策略
我們需要在@SentinelResource中配置blockHandler參數(shù)。(這里跟之前方法限流的配置是一樣的,因?yàn)槿绻砑恿?#64;SentinelResource注解,那么這里就會(huì)進(jìn)行方法級別細(xì)粒度的限制,和之前方法級別限流一樣,會(huì)在降級之后直接拋出異常,如果不添加則返回默認(rèn)的限流頁面,blockHandler的目的就是處理這種Sentinel機(jī)制的異常,所以這里其實(shí)和之前的限流配置是一個(gè)道理,因此下面熔斷配置也應(yīng)該對value自定義名稱的資源進(jìn)行配置,才能作用到此方法上)
測試代碼:
//降級測試@RequestMapping("/borrow4/{uid}")@SentinelResource(value = "findBorrow", blockHandler = "test")public ResponseEntity<UserBook> findBorrow(@PathVariable("uid") Integer uid) throws Exception {throw new RuntimeException();}//代替方案public ResponseEntity<UserBook> test(Integer uid, BlockException e) {System.out.println(e.getClass());UserBook userBook = new UserBook();userBook.setUser(new DbUser());userBook.setBook(Collections.emptyList());return ResponseEntity.ok(userBook);}在Sentinel中設(shè)置熔斷規(guī)則。
?效果圖:
?拋出降級異常。
openFeign支持Sentinel
?前面我們使用Hystrix的時(shí)候,就可以直接對openFeign的每個(gè)接口調(diào)用單獨(dú)進(jìn)行服務(wù)降級,而使用Sentinel,也可以的。
在配置文件中開啟支持。
feign:sentinel:enabled: true?和之前的openfign整合eureka的服務(wù)降級配置相似。
BookClient接口:
@FeignClient(value = "book-service",fallback = BookClientImpl.class) public interface BookClient {@GetMapping("/dbBook/{id}")DbBook queryById(@PathVariable("id") Integer id); }BookClient替代方案:
@Component public class BookClientImpl implements BookClient{@Overridepublic DbBook queryById(Integer id) {return new DbBook();} }啟用代替方案效果圖:?
?Sentinel整合RestTemplate使用服務(wù)降級
在config進(jìn)行配置。
@Configuration public class RestTemplateConfig {@Bean@LoadBalanced@SentinelRestTemplate(blockHandler = "handleException", blockHandlerClass = ExceptionUtil.class,fallback = "對應(yīng)的降級方案", fallbackClass = ExceptionUtil.class)public RestTemplate restTemplate() {return new RestTemplate();} }?Seata與分布式事務(wù)
Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。
事務(wù)特性
?分布式講解方案
1.XA分布式事務(wù)協(xié)議 -2PC (兩階段提交實(shí)現(xiàn))?
這里的PC實(shí)際上指的是Prepare和Commit,也就是說它分為兩個(gè)階段,一個(gè)是準(zhǔn)備一個(gè)是提交,整個(gè)過程的參與者一共有兩個(gè)角色,一個(gè)是事務(wù)的執(zhí)行者,一個(gè)是事務(wù)的協(xié)調(diào)者,實(shí)際上整個(gè)事務(wù)的運(yùn)作需要毅力啊協(xié)調(diào)者來維持。
?在準(zhǔn)備和提交階段,會(huì)進(jìn)行:
準(zhǔn)備階段:
? ? 一個(gè)分布式事務(wù)是由協(xié)調(diào)者來開啟的,首先協(xié)調(diào)者會(huì)向所有的事務(wù)執(zhí)行者發(fā)送事務(wù)內(nèi)容,等待所有的事務(wù)執(zhí)行者答復(fù)。
各個(gè)事務(wù)執(zhí)行者開始執(zhí)行事務(wù)操作,但不會(huì)進(jìn)行提交,并將undo和redo信息記錄到事務(wù)日志中。
如果事務(wù)執(zhí)行者執(zhí)行事務(wù)成功,那么就告訴協(xié)調(diào)者成功Yes,否則告訴協(xié)調(diào)者失敗No,不能提交事務(wù)。
提交階段:
? ? 當(dāng)前有的執(zhí)行者都反饋完成之后,進(jìn)入第二階段。
協(xié)調(diào)者會(huì)檢測各個(gè)執(zhí)行者的反饋內(nèi)容,如果所有的返回都是成功,那么就告訴所有的執(zhí)行者可以提交事務(wù)了,最后再釋放鎖的資源。
如果有至少一個(gè)執(zhí)行者返回失敗或超時(shí)面,那么就讓所有的執(zhí)行者都會(huì)回滾,分布式事務(wù)執(zhí)行失敗。
雖然這種方式看起來比簡單,但是存在以下幾個(gè)問題:
1.事務(wù)協(xié)調(diào)者是非常核心的角色,一旦出現(xiàn)問題,將導(dǎo)致整個(gè)分布式不能正常運(yùn)行。
2.如果提交階段發(fā)生網(wǎng)絡(luò)問題,導(dǎo)致某事務(wù)執(zhí)行者沒有收到協(xié)調(diào)者發(fā)來的提交命令,將導(dǎo)致某些執(zhí)行者沒提交,這樣肯定是不行的。
2.XA分布式事務(wù)協(xié)議 -3PC(三階段提交實(shí)現(xiàn))
三階段提交是在二階段提交的基礎(chǔ)上的改進(jìn)播版本,主要是加了超時(shí)記機(jī)制,同時(shí)在協(xié)調(diào)者和執(zhí)行者都引入了超時(shí)機(jī)制。
三個(gè)階段分別進(jìn)行:
CanCommit階段:
? ? 協(xié)調(diào)者向執(zhí)行者發(fā)送CanCommit請求,詢問是否可以執(zhí)行事務(wù)提交操作,然后開始等待執(zhí)行者的響應(yīng)。
ProeCommit階段:
? ? 協(xié)調(diào)者根據(jù)執(zhí)行者的反應(yīng)情況來決定是否可以進(jìn)入第二階段事務(wù)的PreCommit操作。
如果所有的執(zhí)行者都返回Yes,則協(xié)調(diào)者向所有的執(zhí)行者發(fā)送PreCommit請求,并進(jìn)入Prepared階段,執(zhí)行者接收到請求后,會(huì)執(zhí)行事務(wù)操作,并將undo和redo信息記錄到事務(wù)日志中,如果成功執(zhí)行,則返回成功的響應(yīng)。
如果所有的執(zhí)行者至少有一個(gè)返回No,則協(xié)調(diào)者會(huì)向所有的執(zhí)行者發(fā)送abort請求,所有的執(zhí)行者在收到請求或超時(shí)一段時(shí)間沒有收到任何請求時(shí),會(huì)直接中斷事務(wù)。
DoCommit階段:
? ? 該階段進(jìn)行真正的事務(wù)提交。
? ? 協(xié)調(diào)者接收到所有執(zhí)行者發(fā)送的成功響應(yīng),那么它就從PreCommit狀態(tài)進(jìn)入DOCommit狀態(tài),并向所有的執(zhí)行者發(fā)送doCommit請求,執(zhí)行者接收到doCommit請求之后,開始執(zhí)行事務(wù)提交,并在完成事務(wù)提交之后釋放所有的事務(wù)資源,并最后向協(xié)調(diào)者發(fā)送確認(rèn)響應(yīng),協(xié)調(diào)者接收到所有執(zhí)行者的確認(rèn)響應(yīng)之后,完成事務(wù)(如果因?yàn)榫W(wǎng)絡(luò)問題導(dǎo)致執(zhí)行者沒有接收到doCommit請求,執(zhí)行者會(huì)在超時(shí)之后直接提交事務(wù),雖然執(zhí)行者只是猜測協(xié)調(diào)者返回的是doCommit請求,但是因?yàn)榍懊娴膬蓚€(gè)流程都正常執(zhí)行,所以能夠在一定程度上認(rèn)為本次事務(wù)是成功的,因此會(huì)直接提交)
? ? 協(xié)調(diào)者沒有接收到至少一個(gè)執(zhí)行者發(fā)送的成功響應(yīng)(可能是響應(yīng)超時(shí)),那么就會(huì)執(zhí)行中斷事務(wù),協(xié)調(diào)者會(huì)向所有的執(zhí)行者發(fā)送abort請求,執(zhí)行者接收到abort請求之后,利用其在PreCommit階段記錄的undo信息來執(zhí)行事務(wù)的回滾操作,并在完成回滾操作之后釋放所有的事務(wù)資源,執(zhí)行者完成事務(wù)回滾之后,向協(xié)調(diào)者發(fā)送確認(rèn)信息,協(xié)調(diào)者接收到參與者反饋的確認(rèn)信息之后,執(zhí)行事務(wù)的中斷。
第三階段的特點(diǎn):
1.3PC在2PC的第一階段和第二階段中插入一個(gè)準(zhǔn)備階段,保證在最后提交階段之前各參與節(jié)點(diǎn)的狀態(tài)是一致的。
2.一旦參與者無法及時(shí)收到來自協(xié)調(diào)者的信息之后,會(huì)默認(rèn)執(zhí)行Commit,這樣就不會(huì)因?yàn)閰f(xié)調(diào)者單方面故障導(dǎo)致全局出現(xiàn)問題。
3.但是我們知道,實(shí)際上超時(shí)之后的Commit決策本質(zhì)上就是一個(gè)賭注罷了,如果此時(shí)協(xié)調(diào)者發(fā)送的是abort請求但是超時(shí)未接收,那么就會(huì)直接導(dǎo)致數(shù)據(jù)一致性的問題。?
3.TCC (補(bǔ)償事務(wù))
補(bǔ)償事務(wù)TCC就是Try,Comfirm,Cancel,它對業(yè)務(wù)有入侵性,一共分為三個(gè)階段。
Try階段:
? ? 比如我們需要借書時(shí),將書籍的庫存-1,并將用戶的借閱量-1,但是這個(gè)操作,除了直接對庫存和借閱量進(jìn)行修改之外,還需要將減去的值,單獨(dú)存放到凍結(jié)表中,但是此時(shí)不會(huì)創(chuàng)建借閱信息,也就是說只是預(yù)先把關(guān)鍵的東西給處理了,預(yù)留業(yè)務(wù)資源出來。
Confirm階段:
? ? 如果Try執(zhí)行成功無誤,那么就進(jìn)入Confirm階段,接著之前,我們就該創(chuàng)建借閱信息了,只能使用Try階段預(yù)留的業(yè)務(wù)資源,如果創(chuàng)建成功,那么就對Try階段凍結(jié)的值進(jìn)行解凍,整個(gè)流程就完成了,如果失敗了,就會(huì)進(jìn)入Cancel階段。
Cancel階段:
? ? 將凍結(jié)的東西還給人家,進(jìn)行回滾。
TCC特點(diǎn):
跟XA協(xié)議相比,TCC就沒有協(xié)調(diào)者這一角色的參與了,而是自主通過上一階段的執(zhí)行情況來確保正常,充分利用了集群的優(yōu)勢,性能也是有很大的提升,但是缺點(diǎn)也很明顯,它與業(yè)務(wù)具有一定的關(guān)聯(lián)性,需要開發(fā)者去編寫更多的補(bǔ)償代碼,同時(shí)并不一定所有的業(yè)務(wù)流程都適用于這種形式。?
?Seata機(jī)制簡介
?seata支持四種事務(wù):
1.AT:標(biāo)志上就是2PC的升級版,在AT模式下,用戶只需要關(guān)心自己的"業(yè)務(wù)SQL"。
? ? ?一階段:seata會(huì)攔截"業(yè)務(wù)SQL",首先解析SQL語義,找到"業(yè)務(wù)SQL"要更新的業(yè)務(wù)數(shù)據(jù),在業(yè)務(wù)數(shù)據(jù)更新之前,將其保存成"before image",然后進(jìn)行"業(yè)務(wù)SQL"更新業(yè)務(wù)數(shù)據(jù),在業(yè)務(wù)數(shù)據(jù)更新后,再將其保存到"after image",最后生成行鎖。以上操作只在一個(gè)數(shù)據(jù)庫事務(wù)內(nèi)完成,這樣保證了第一階段操作的原子性。
? ? 二階段如果確定提交的話,因?yàn)?#34;業(yè)務(wù)SQL"在一階段已經(jīng)提交到數(shù)據(jù)庫,所以Seata框架只需要將第一階段保存的快照數(shù)據(jù)和行鎖刪掉,完成數(shù)據(jù)清除即可,當(dāng)然如果需要回滾,那么就用"before image"還原業(yè)務(wù)數(shù)據(jù),但在還原前首先要校驗(yàn)臟讀,對比"數(shù)據(jù)庫當(dāng)前業(yè)務(wù)數(shù)據(jù)"和"after image",如果兩份數(shù)據(jù)完全一致就說明沒有臟讀,可以還原業(yè)務(wù)數(shù)據(jù),如果不一致就說明有臟讀,出現(xiàn)臟讀就需要轉(zhuǎn)人工處理。??
2.TCC:和我們上面講解的思路是一樣的。
3.XA:同上,但是要求數(shù)據(jù)庫本身支持這種模式才可以。
4.saga:用用處理長事務(wù),每個(gè)執(zhí)行者需要實(shí)現(xiàn)事務(wù)的正向操作和補(bǔ)償操作。
?那么,以AT模式為例,我們的程序是如何才能做到不對業(yè)務(wù)進(jìn)行侵入的情況下實(shí)現(xiàn)分布式事務(wù)能?實(shí)際上,Seata客戶端,是通過對數(shù)據(jù)源進(jìn)行代理實(shí)現(xiàn)的,使用的是DataSourceProxy類,所以在程序這邊,我們只需要將對應(yīng)的代理類注冊到Bean即可。(0.9版本之后支持自動(dòng)代理,并不需要我們手動(dòng)導(dǎo)入)
?使用file進(jìn)行部署(以AT為例)
下載seata-server。
seats-server
?在idea中配置seata服務(wù)。
?seata支持本地部署也支持服務(wù)注冊與發(fā)現(xiàn)中心部署。(比如eureka,nacos)
seata存在著事務(wù)分組機(jī)制:
1.事務(wù)分組:seata資源邏輯,可以按微服務(wù)的需要,在應(yīng)用程序(客戶端)對自定義事務(wù)進(jìn)行分組,每個(gè)組取一個(gè)名字。
2.集群:seata-server服務(wù)端一個(gè)或多個(gè)節(jié)點(diǎn)組成的集群cluster。應(yīng)用程序(客戶端)使用時(shí)需要指定事務(wù)邏輯分組與seata服務(wù)器集群(默認(rèn)為default)的映射關(guān)系。
為啥要設(shè)計(jì)成通過事務(wù)分組再直接映射到集群?為什么不直接將事務(wù)指定到集群呢?
獲取事務(wù)分組到映射集群的配置。這樣設(shè)計(jì)后,事務(wù)分組可以作為資源的邏輯隔離單位,出現(xiàn)某集群故障時(shí)可以快速failover(故障切換),只切換對應(yīng)的分組,可以把故障縮減到服務(wù)級別,但提前也是你有足夠server集群。
將各個(gè)服務(wù)作為seata的客戶端導(dǎo)入對于依賴:
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2021.0.1.0</version></dependency>修改配置文件。
seata:service:vgroup-mapping:#這里需要對事務(wù)組進(jìn)行映射,默認(rèn)組名為應(yīng)用名稱-seata-service-group,將其映射到default集群#這個(gè)很關(guān)鍵,一定要配置,不然會(huì)找不到服務(wù)borrow-service-seata-service-group: defaultgrouplist: localhost:8868也可以設(shè)置自定義的服務(wù)分組。
seata:service:vgroup-mapping:#這里需要對事務(wù)組進(jìn)行映射,默認(rèn)組名為應(yīng)用名稱-seata-service-group,將其映射到default集群#這個(gè)很關(guān)鍵,一定要配置,不然會(huì)找不到服務(wù)xx服務(wù)名xx-seata-service-group: xxxgrouplist: localhost:8868tx-service-group: xxx現(xiàn)在我們接著來配置開啟分布式事務(wù),首先在啟動(dòng)類上添加注解,此注解會(huì)添加一個(gè)后置處理器將數(shù)據(jù)源封裝為支持分布式事務(wù)的代理數(shù)據(jù)源(1.4.2版本還是要手動(dòng)添加此注解才能生效)?
?接著我們需要在開啟分布式事務(wù)的方法上添加注解@GlobalTransactional。
?因?yàn)镾eata會(huì)分析修改數(shù)據(jù)的sql,同時(shí)生成對應(yīng)的反向回滾sql,這個(gè)回滾記錄會(huì)存放在undo_log表中,所以要求每個(gè)Client都有一個(gè)對應(yīng)的undo_log表(也就是說服務(wù)連接的數(shù)據(jù)庫都需要?jiǎng)?chuàng)建一個(gè)這樣的表,因?yàn)槲覀兊睦泳鸵粋€(gè)數(shù)據(jù)庫,所有只要?jiǎng)?chuàng)建一個(gè)表)
創(chuàng)建undo_log表的sql語句。
CREATE TABLE IF NOT EXISTS `undo_log` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'increment id',`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME NOT NULL COMMENT 'create datetime',`log_modified` DATETIME NOT NULL COMMENT 'modify datetime',`ext` VARCHAR(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8;?啟動(dòng)服務(wù),進(jìn)行測試。
第一次借閱成功。?
第二次借閱失敗。
?查看數(shù)據(jù)庫是否進(jìn)行回滾。(這里進(jìn)行了回滾)?
我們可以打印XID,查看其對應(yīng)的XID,在service層添加語句。
System.out.println(RootContext.getXID());也可以在日志中查看。
使用nacos模式部署
我們先為Seata在nacos中配置一個(gè)命名空間。
修改seata的/conf/registry.conf文件,修改其注冊和配置中的“type”,“namespace”,“password”,“username”。
?注冊信息配置完成之后,接著我們需要將配置文件也放到Nacos中,讓Nacos管理配置,這樣我們就可以對配置進(jìn)行熱更新了,一旦環(huán)境需要改變,只需要直接到Nacos中修改即可。
??
?我們需要對配置導(dǎo)入到Nacos中,我們打開seata源碼的 /script/config-center/nacos目錄,這是官方提供上傳腳本,我們直接運(yùn)行即可。(去github下載seata源碼)
?在nacos中查看seata的配置。
?把所有微服務(wù)的事務(wù)分組信息的配置放在nacos中,我們還需要將對應(yīng)的事務(wù)組映射配置也添加上,DataId格式為service.vgroupMapping.'事務(wù)的名稱'。??
?接下來我們要對服務(wù)端的配置進(jìn)行修改,我們刪除原本的seata配置,添加新的seata配置。
seata:#注冊registry:type: nacosnacos:#使用seata命名空間,這樣才能找到seata服務(wù),由于組名我們設(shè)置的是SEATA_GROUP就是默認(rèn)的名字,所以就不用配了namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos#配置config:type: nacosnacos:namespace: 550e71d6-4604-4952-a24b-b0d3781d8223username: nacospassword: nacos? 啟動(dòng)seata服務(wù),在nacos中對應(yīng)的命名空間觀察seata服務(wù)是否正常啟動(dòng)。
啟動(dòng)各個(gè)微服務(wù),各個(gè)服務(wù)使用nacos配置成功。
測試事務(wù)效果圖:
??
?我們還可以配置一下事務(wù)會(huì)話信息的存儲(chǔ)方式,默認(rèn)是file類型的,那么就會(huì)在運(yùn)行目錄下創(chuàng)建file_store目錄,我們可以將其放到數(shù)據(jù)庫中存儲(chǔ),只需要修改一下數(shù)據(jù)即可。
?默認(rèn)情況的存儲(chǔ)方式:
將會(huì)話信息存放到數(shù)據(jù)庫中?
修改nacos中的seata配置store.mode,store.session.mode,將存儲(chǔ)方式改為數(shù)據(jù)庫方式。
?
?將數(shù)據(jù)庫的配置信息進(jìn)行修改。
1.數(shù)據(jù)庫啟動(dòng)。
?2.數(shù)據(jù)庫的URL。
3.數(shù)據(jù)庫用戶名和密碼。
?
創(chuàng)建seata數(shù)據(jù)庫。
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` (`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(128),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` (`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;-- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` (`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(128),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_status` (`status`),KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;CREATE TABLE IF NOT EXISTS `distributed_lock` (`lock_key` CHAR(20) NOT NULL,`lock_value` VARCHAR(20) NOT NULL,`expire` BIGINT,primary key (`lock_key`) ) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4;INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('HandleAllSession', ' ', 0);數(shù)據(jù)庫效果圖:
運(yùn)行事務(wù)效果圖:?
分布式權(quán)限校驗(yàn)??
因?yàn)槭欠植际椒?wù),每個(gè)微服務(wù)存儲(chǔ)的sessionb是各不相同的,而我們需要的是保證所有的微服務(wù)都能同步這些session信息,這樣我們才能實(shí)現(xiàn)某一個(gè)微服務(wù)登錄時(shí),其他微服務(wù)都能知道。
實(shí)現(xiàn)上述要求的方案
方案一:我們可以在每臺(tái)服務(wù)器上都復(fù)制一份Session,但這樣顯然是很浪費(fèi)時(shí)間的,并且用戶驗(yàn)證數(shù)據(jù)占用的內(nèi)存會(huì)成倍的增加。
方案二:將Session移出服務(wù)器,用統(tǒng)一訪問Redis或是Mysql即可,這樣就能保證服務(wù)都可以同步Seesion了。
?明顯方案二是可行的。
每個(gè)微服務(wù)需要添加驗(yàn)證機(jī)制,導(dǎo)入對應(yīng)依賴。
<!-- springSession Redis支持--><dependency><groupId>org.springframework.session</groupId><artifactId>spring-session-data-redis</artifactId><version>2.7.0</version></dependency> <!-- 添加Redis的Starter--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.7.0</version></dependency>導(dǎo)入springSecurity框架的依賴。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId><version>2.7.0</version></dependency>對每個(gè)微服務(wù)的配置文件進(jìn)行修改。
spring:session:store-type: redisredis:host: lcoalhost:6379進(jìn)行測試。
因?yàn)閟pring-security的安全機(jī)制,所以我們需要攜帶對應(yīng)的cookies才能訪問對應(yīng)的微服務(wù)。
其登錄頁面的用戶名為:user,密碼在idea的運(yùn)行日志中。
?然后服務(wù)端會(huì)將session存放到數(shù)據(jù)庫中。
但是我們服務(wù)對應(yīng)的服務(wù)還是會(huì)報(bào)500錯(cuò)誤,只是因?yàn)槲覀兪褂昧薘estTemplate,RestTemplate類似一個(gè)瀏覽器,由于該微服務(wù)調(diào)用了其他的微服務(wù),又因?yàn)镽estTemplate在訪問微服務(wù)時(shí)沒有攜帶對應(yīng)的cookies,所以報(bào)出500錯(cuò)誤。
OAuth2.0實(shí)現(xiàn)單點(diǎn)操作
?前面我們雖然使用了統(tǒng)一存儲(chǔ)來解決Session共享的問題,但是我們發(fā)現(xiàn)就算實(shí)現(xiàn)了Session共享,依舊存在一些問題,由于我們每個(gè)微服務(wù)都有自己的驗(yàn)證模板,實(shí)際上整個(gè)系統(tǒng)上存在冗余功能的,同時(shí)還有我們上面出現(xiàn)的問題,那么能否實(shí)現(xiàn)只在一個(gè)服務(wù)進(jìn)行等錄,就可以訪問其他的服務(wù)能?
?實(shí)際上之前的登錄模式稱為多點(diǎn)登錄,而我們希望的是實(shí)現(xiàn)單點(diǎn)登錄。
這里我們需要了解一種全新的登錄方式:OAuth2.0,我們經(jīng)常看到一些網(wǎng)站支持第三方登錄,就是使用OAuth2.0?來實(shí)現(xiàn)第三方授權(quán),基于第三方應(yīng)用訪問用戶信息的權(quán)限。(本質(zhì)上就是給別人調(diào)用自己服務(wù)接口的權(quán)限)
?四種授權(quán)模式
1.客戶端模式(Client Credentials)
這是最簡單的一種模式,我們可以直接向驗(yàn)證服務(wù)器 請求可以Token,服務(wù)器拿到Token之后,才能去訪問服務(wù)資源,這樣資源服務(wù)器才能知道我們是誰以及是否成功登錄。(不需要密碼驗(yàn)證)
雖然這種模式比較簡便,但是已經(jīng)失去了用戶驗(yàn)證的意義,壓根就不是給用戶校驗(yàn)準(zhǔn)備的,而是更適合內(nèi)部調(diào)用的場景。?
2.密碼模式(Resource Owner Password Credentials)
密碼模式相比客戶端模式,就多了用戶名和密碼的信息,用戶需要提供對應(yīng)的賬號的用戶名和密碼,才能獲取Token。
?雖然這樣看起來比較合理,但是會(huì)直接將賬號和密碼泄露給客戶端,需要后臺(tái)完全信任客戶端不會(huì)拿賬號和密碼去干其他壞事,所以也不是我們常見的。(可能前端或第三方會(huì)拿著你的賬號,登錄你的服務(wù)干壞事,很不安全)
3.隱式授權(quán)模式(Implicit Grant)
?首先用戶訪問頁面時(shí),會(huì)重定向到認(rèn)證服務(wù)器上,接著認(rèn)證服務(wù)器給用戶一個(gè)認(rèn)證頁面,等待用戶授權(quán),用戶填寫信息完成授權(quán)后,認(rèn)證服務(wù)器返回Token。?
?它適用于沒有服務(wù)端的第三方應(yīng)用頁面,并且相比前面一種形式,驗(yàn)證都是在驗(yàn)證服務(wù)器進(jìn)行的,敏感信息不會(huì)輕易泄露,但是Token依然存在泄漏的風(fēng)險(xiǎn)。
?4.授權(quán)碼模式(Authrization Code)
這種模式是最安全的一種模式,也是推薦使用的一種,比如我們手機(jī)上的很多APP都是使用的這種方式。
相比隱式授權(quán)模式,它并不會(huì)直接返回Token,而是返回授權(quán)碼,真正的Token是通過應(yīng)用服務(wù)器訪問驗(yàn)證服務(wù)器獲得的。在一開始的時(shí)候,應(yīng)用服務(wù)器(客戶端通過訪問自己的應(yīng)用服務(wù)器來進(jìn)行訪問其他的服務(wù))和驗(yàn)證服務(wù)器之間會(huì)共享一個(gè)“secret”(沒有登錄的時(shí)候是沒有的“secret”),這個(gè)東西沒有其他人知道,而驗(yàn)證服務(wù)器在用戶驗(yàn)證之后,會(huì)返回一個(gè)授權(quán)碼,應(yīng)用服務(wù)器最后將授權(quán)碼和“secret”一起交給驗(yàn)證服務(wù)器進(jìn)行驗(yàn)證,并且Token也是在服務(wù)器之間傳遞,不會(huì)直接給客戶端。?
?這樣就算有人中途竊取了授權(quán)碼,也毫無意義,因?yàn)門oken的獲取必須同時(shí)攜帶授權(quán)碼和“secret”,但是“secret”第三方是無法得知的,并且Token不會(huì)直接給客戶端,大大減少了泄漏的風(fēng)險(xiǎn)。
OAth2.0不應(yīng)該是那種第三方應(yīng)用為了請求我們的服務(wù)而使用的嗎,而我們這里需要的只是實(shí)現(xiàn)同一個(gè)應(yīng)用內(nèi)部服務(wù)之間的認(rèn)證,其實(shí)我們也可以利用OAuth2.0來實(shí)現(xiàn)單點(diǎn)登錄,只是少了資源服務(wù)器這個(gè)角色,客戶端就是我們的整個(gè)系統(tǒng),接下來就讓我們來實(shí)現(xiàn)一下。
?搭建驗(yàn)證服務(wù)器
第一步就是最重要的,我們需要搭建一個(gè)驗(yàn)證服務(wù)器,它是我們進(jìn)行權(quán)限校驗(yàn)的核心,驗(yàn)證服務(wù)器有很多的第三方實(shí)現(xiàn),也有Spring官方通過的實(shí)現(xiàn),這里我們使用Spring官方通過的驗(yàn)證服務(wù)器。?
?導(dǎo)入對應(yīng)依賴:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependency><!-- oauth依賴不再內(nèi)置到spring-cloudy依賴中,需要指定對應(yīng)的版本,且其已經(jīng)整合到spring-security中了--><dependency><groupId>org.springframework.security.oauth</groupId><artifactId>spring-security-oauth2</artifactId><version>2.5.2.RELEASE</version></dependency>修改配置文件。
server:port: 8500servlet:#為了防止一會(huì)在服務(wù)間跳轉(zhuǎn)導(dǎo)致Cookies打架。(因?yàn)樗械姆?wù)地址但是localhost,都會(huì)存JSESSIONID)#這里修改一下context-path,這樣保存的Cookie會(huì)使用指定的路徑,就不會(huì)和其他服務(wù)打架了#但注意之后的所有請求都得在最前面添加這個(gè)路徑context-path: /sso?編寫spingSecurity配置類和OAuth2的配置類。
springSecurity配置類:
@Configuration public class SecurityConfiguration extends WebSecurityConfigurerAdapter {@Overrideprotected void configure(AuthenticationManagerBuilder auth) throws Exception {BCryptPasswordEncoder bCryptPasswordEncoder = new BCryptPasswordEncoder();//對密碼進(jìn)行加密的類auth.inMemoryAuthentication()//直接創(chuàng)建一個(gè)用戶.passwordEncoder(bCryptPasswordEncoder).withUser("test").password(bCryptPasswordEncoder.encode("123456")).roles();}@Overrideprotected void configure(HttpSecurity http) throws Exception {http.authorizeRequests().anyRequest().authenticated().and().formLogin().permitAll();//使用表單登錄}@Bean@Override //這里我們需要將AuthenticationManager注冊為Bean,因?yàn)槲覀円贠Auth2中使用它public AuthenticationManager authenticationManagerBean() throws Exception {return super.authenticationManagerBean();} }OAth2配置類:
@EnableAuthorizationServer //開啟驗(yàn)證服務(wù)器 @Configuration public class OAuth2Configuration extends AuthorizationServerConfigurerAdapter {@Resourceprivate AuthenticationManager manager;private final BCryptPasswordEncoder encoder = new BCryptPasswordEncoder();//對密碼進(jìn)行加密的類/*** 這個(gè)方法是對客戶端進(jìn)行配置,一個(gè)驗(yàn)證服務(wù)器可以預(yù)設(shè)很多個(gè)客戶端,* 之后這些指定的客戶端就可以按照下面指定的方式進(jìn)行驗(yàn)證* @param clients 客戶端配置工具*/@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創(chuàng)建,當(dāng)然也可以像Security那樣自定義或是使用JDBC從數(shù)據(jù)庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動(dòng)審批,這里關(guān)閉,要的就是一會(huì)體驗(yàn)?zāi)欠N感覺.scopes("book", "user", "borrow") //授權(quán)范圍,這里我們使用全部all.authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權(quán)模式,一共支持5種,除了之前我們介紹的四種之外,還有一個(gè)刷新Token的模式//這里我們直接把五種都寫上,方便一會(huì)實(shí)驗(yàn),當(dāng)然各位也可以單獨(dú)只寫一種一個(gè)一個(gè)進(jìn)行測試//現(xiàn)在我們指定的客戶端就支持這五種類型的授權(quán)方式了}@Overridepublic void configure(AuthorizationServerSecurityConfigurer security) {security.passwordEncoder(encoder) //編碼器設(shè)定為BCryptPasswordEncoder.allowFormAuthenticationForClients() //允許客戶端使用表單驗(yàn)證,一會(huì)我們POST請求中會(huì)攜帶表單信息.checkTokenAccess("permitAll()"); //允許所有的Token查詢請求}@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動(dòng),這里需要配置一下authenticationManager,才能正常使用password模式} }? ? ? ? 然后我們使用測試工具進(jìn)行測試。?
1.首先我們從最簡單的客戶端模式進(jìn)行測試,客戶端模式只需要提供id和secret即可直接拿到Token,注意需要添加一個(gè)grant_type來表明我們的授權(quán)方式,默認(rèn)請求路徑為:http://localhost:8500/sso/oauth/token。
測試結(jié)果圖:
?我們可以通訪問http://localhost:8500/sso/oauth/check_token來驗(yàn)證我們的Token是否有效。?
?
2.我們進(jìn)行密碼模式的測試,這里的請求參數(shù)為username和password,授權(quán)模式改為passwod。
?在請求頭中添加Basic驗(yàn)證信息。
測試結(jié)果圖:
?Token驗(yàn)證(返回用戶名):
?3.隱式授權(quán)模式,這種模式我們需要在驗(yàn)證服務(wù)器上進(jìn)行驗(yàn)證,而不是直接請求Token,驗(yàn)證登錄請求地址:http://localhost:8500/sso/oauth/authorize?client_id=web&response_type=token。
注意response_type一定要是Token類型,這樣才會(huì)返回Token,瀏覽器發(fā)起請求后,可以看到SpringSecurity的登錄頁面。
登錄之后會(huì)有個(gè)錯(cuò)誤信息,這是因?yàn)榈卿洺晒?#xff0c;驗(yàn)證服務(wù)器需要將結(jié)果給回客戶端,所以需要提供供客戶端的回調(diào)地址,這樣瀏覽器就會(huì)被重定向到指定的回調(diào)地址并且請求中回?cái)y帶Token信息,這里我們隨便配置一個(gè)回調(diào)信息。
@Overridepublic void configure(ClientDetailsServiceConfigurer clients) throws Exception {clients.inMemory() //這里我們直接硬編碼創(chuàng)建,當(dāng)然也可以像Security那樣自定義或是使用JDBC從數(shù)據(jù)庫讀取.withClient("web") //客戶端名稱,隨便起就行.secret(encoder.encode("654321")) //只與客戶端分享的secret,隨便寫,但是注意要加密.autoApprove(false) //自動(dòng)審批,這里關(guān)閉,要的就是一會(huì)體驗(yàn)?zāi)欠N感覺.scopes("book", "user", "borrow") //授權(quán)范圍,這里我們使用全部all.redirectUris("localhost:8202/login").authorizedGrantTypes("client_credentials", "password", "implicit", "authorization_code", "refresh_token");//授權(quán)模式,一共支持5種,除了之前我們介紹的四種之外,還有一個(gè)刷新Token的模式//這里我們直接把五種都寫上,方便一會(huì)實(shí)驗(yàn),當(dāng)然各位也可以單獨(dú)只寫一種一個(gè)一個(gè)進(jìn)行測試//現(xiàn)在我們指定的客戶端就支持這五種類型的授權(quán)方式了}進(jìn)行授權(quán)。
最終會(huì)將Toekn返回到指定的頁面。
4.最安全的授權(quán)碼模式,這種模式其實(shí)流程和隱式授權(quán)模式是一樣的,當(dāng)是請求的是Code類型:http:localhost:8500/sso/oauth/authorize?/client_id=web&response_type=code。
在訪問此地址依舊會(huì)進(jìn)入回調(diào)地址,但是這時(shí)給的就是授權(quán)碼了,而不是直接返回Token。
然后我們可以使用這個(gè)授權(quán)碼和secret來獲取Token。
5.最后還有一個(gè)是刷新Token用的模式,當(dāng)我們的Token過期時(shí),我們就可以使用refresh_token來申請一個(gè)新的Token,當(dāng)我們使用授權(quán)碼模式時(shí),在成功驗(yàn)證以后驗(yàn)證服務(wù)器會(huì)返回一個(gè)refresh_token,如果我們需要刷新一下Token,就執(zhí)行下面操作。
?在SecurityConfiguration配置類中將UserDetailsService注入spring容器中。
@Bean@Overridepublic UserDetailsService userDetailsServiceBean() throws Exception {return super.userDetailsServiceBean();}在OAuth2Configuration類中使用UserDetailsService。
@ResourceUserDetailsService service;@Overridepublic void configure(AuthorizationServerEndpointsConfigurer endpoints) {endpoints.userDetailsService(service).authenticationManager(manager);//由于SpringSecurity新版本的一些底層改動(dòng),這里需要配置一下authenticationManager,才能正常使用password模式}進(jìn)行測試。
redis與分布式(docker模擬集群搭建)
拉取redis文件。
docker pull redis在云服務(wù)器上創(chuàng)建等會(huì)需要掛載的目錄和文件
- 創(chuàng)建data目錄(存放數(shù)據(jù)文件,包括用于持久化的dump.rdb)
- 創(chuàng)建配置文件
?創(chuàng)建redis容器,運(yùn)行redis,并進(jìn)行數(shù)據(jù)和配置文件的掛載。
docker run -p 3346:6379 --name redis -v /mydata/redis/data:/data \ -v /mydata/redis/conf/redis.conf:/etc/redis/redis.conf \ -d redis redis-server /etc/redis/redis.conf:左邊的表示云服務(wù)器的真實(shí)端口,右邊表示redis容器中的端口。
--name 表示為容器設(shè)置的名稱,-d 后面的名稱表示鏡像的名稱。
進(jìn)入對應(yīng)的容器
docker exec -it 容器對應(yīng)id或是容器名稱 bash #進(jìn)入到docker中 redis-cli #加入容器中的redis中#也可以直接進(jìn)入 docker exec -it 容器對應(yīng)的id或是容器的名稱 redis-cli效果圖:
docker 搭建redis集群(使用三個(gè)redis搭建集群)
?在對應(yīng)的目錄下創(chuàng)建用來掛在數(shù)據(jù)的目錄。
mkdir -p /mydata/redis/cluster/node1/data #集群一 mkdir -p /mydata/redis/cluster/node2/data #集群二 mkdir -p /mydata/redis/cluster/node3/data #集群三為了方便我們這里就不去手動(dòng)修改redis.conf,我們這里使用命令我們設(shè)置操作。
1.為先創(chuàng)建對應(yīng)的redis容器。
docker create --name redis-node1 -v /mydata/redis/cluster/node1/data:/data \ -p 3346:6379 redis --cluster-enabled yes \ --cluster-config-file redis-node1.conf首先間將存儲(chǔ)數(shù)據(jù)目錄進(jìn)行掛載,配置端口,設(shè)置集群模式為true,設(shè)置集群的配置文件,做集群的時(shí)候會(huì)將一些配置添加到該配置文件中。(該文件會(huì)自動(dòng)生成)
2.啟動(dòng)該容器。
docker start redis-node1另一個(gè)redis的配置跟redis-node1是一樣的,只要將容器名稱和掛載路徑進(jìn)行修改即可。
?現(xiàn)在我們只是單純的搭建了兩個(gè)單獨(dú)的redis,我們還需要將兩者聯(lián)系起來。
因?yàn)槲覀冊O(shè)置docker中部署,所以docker 會(huì)給每一個(gè)容器分配一個(gè)ip地址,我們需要查看對應(yīng)的ip才能進(jìn)行聯(lián)系。
查看方式。
docker inspect 對應(yīng)的容器名ip為IpAddress的參數(shù)。?
?
執(zhí)行命令創(chuàng)建集群。
redis-cli --cluster create 172.17.0.3:6379 172.17.0.4:6379 172.17.0.4:6379 --cluster-replicas 0這里cluster-replicas表示主從比例,我們設(shè)置為0,就說明全部為master,如果我們需要做主從關(guān)系的話,也就是不將cluster-replicas的比例設(shè)置為0,那我們需要保證有三個(gè)以上的master(主節(jié)點(diǎn)),不然是無法搭建集群的。(并且搭建集群的最少節(jié)點(diǎn)數(shù)為3個(gè),就是需要保證有三個(gè)master)
錯(cuò)誤信息:
按其默認(rèn)的配置,我們輸入"yes"?。
測試集群是否生效
?加入對應(yīng)的容器。
docker exec -it redis-node1 Redis-cli -c #-c必須要加,表示以集群啟動(dòng),這樣在做數(shù)據(jù)庫操作時(shí)不會(huì)因?yàn)椴宀鄣南拗贫煌瓿刹涣瞬僮?因?yàn)閞edis-node1表示負(fù)責(zé)管理5798插槽的,所以集群就中找到負(fù)責(zé)管理該插槽的節(jié)點(diǎn)進(jìn)行set操作。?
?接下來我們?nèi)テ渌?jié)點(diǎn)查看是否將數(shù)據(jù)插入成功。
我們隨便在一個(gè)節(jié)點(diǎn)中查詢對應(yīng)的值,即使該節(jié)點(diǎn)沒有需要的值,該集群會(huì)自動(dòng)在節(jié)點(diǎn)中查找需要的值。
?查詢集群節(jié)點(diǎn)信息。
cluster nodes?如果需要?jiǎng)h除集群的話,我們只需要?jiǎng)h除集群中使用節(jié)點(diǎn)的集群配置信息redis-node*.conf。
如果需要在集群中添加從節(jié)點(diǎn),我們可以執(zhí)行對應(yīng)命令:
redis-cli --cluster add-node 172.17.0.8:6379 --cluster-slave --cluster-master 對應(yīng)的主節(jié)點(diǎn)的id如果需要提高java代碼獲取信息的話,我們就需要將各個(gè)節(jié)點(diǎn)的配置進(jìn)行掛載,然后修改配置信息,將保護(hù)模式關(guān)閉,并將綁定Ip注釋掉。
?集群模式是自帶哨兵模式的。(當(dāng)主節(jié)點(diǎn)down掉時(shí),會(huì)將其從節(jié)點(diǎn)作為新的主節(jié)點(diǎn))
哨兵模式的選舉規(guī)則
1.首先會(huì)根據(jù)優(yōu)先級進(jìn)行選擇,可以在配置文件中進(jìn)行配置,添加"relica-priority"配置項(xiàng)(默認(rèn)是100),越小表示的優(yōu)先級就越高。
2.如果優(yōu)先級一樣,那就選擇偏移量最大的。
3.要是還是選不出來,就選擇runid(啟動(dòng)時(shí)隨機(jī)生成的)最小的。
實(shí)現(xiàn)分布式鎖
?為了解決電商超買的問題,我們可以通過redis設(shè)置分布式鎖。
#只有當(dāng)key值不存在的時(shí)候才能進(jìn)行設(shè)置,其實(shí)際上是set if no exists的縮寫 setnx key value利用這種特性,我們就可以在不同的服務(wù)中實(shí)現(xiàn)分布式鎖,如果某個(gè)服務(wù)器加了鎖但是卡頓了,或是直接崩潰了,那么這把鎖豈不是就永遠(yuǎn)無法釋放了,因此我們可以考慮添加一個(gè)過期時(shí)間。
set key value EX num NX?這里使用set命令,最后加一個(gè)NX表示使用setx的模式,和上面一樣,但是可以通過EX設(shè)置過期時(shí)間,這里設(shè)置為num秒,也就是說如果num秒還沒釋放,那就自動(dòng)刪除。
上鎖出現(xiàn)的問題
1.?
?2.?
?3.
?要解決這個(gè)問題,我們可以借助一下Redisson框架,它是Redis官方推薦的java版Redis客戶端,它提供的功能非常強(qiáng)大,也非常大,Redisson內(nèi)部提供一個(gè)監(jiān)控鎖的看門狗。它的作用是在Redisson實(shí)例被關(guān)閉前,不斷的延長鎖的有效期,它為我們提供了很多中分布式的實(shí)現(xiàn),這里我們嘗試使用它的分布式鎖的功能。
導(dǎo)入依賴。
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>4.2.1</version></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.0</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency>?我們先測試沒有加鎖的情況。
測試代碼:
import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){for (int j = 0; j < 100; j++) { //每個(gè)客戶端獲取a然后增加a的值再寫回去,如果不加鎖那么肯定會(huì)出問題,在每次插入的過程中,其實(shí)內(nèi)部值已經(jīng)發(fā)生改變了,比如說同一時(shí)間,其獲取到a的值是相同的,也就是說這么多個(gè)插入最終只是+1。int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");}}}).start();}} }結(jié)果圖:?
沒有到1000,說明出現(xiàn)錯(cuò)誤了,?加上鎖試試。
測試代碼:
import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import redis.clients.jedis.Jedis;public class Main {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379"); //配置連接的Redis服務(wù)器,也可以指定集群RedissonClient client = Redisson.create(config); //創(chuàng)建RedissonClient客戶端for (int i = 0; i < 10; i++) {new Thread(() -> {try(Jedis jedis = new Jedis("127.0.0.1", 6379)){RLock lock = client.getLock("testLock"); //指定鎖的名稱,拿到鎖對象for (int j = 0; j < 100; j++) {lock.lock(); //加鎖int a = Integer.parseInt(jedis.get("a")) + 1;jedis.set("a", a+"");lock.unlock(); //解鎖}}System.out.println("結(jié)束!");}).start();}} }效果圖:
符合最終的結(jié)果。?
Mysql與分布式??
?主從復(fù)制
當(dāng)我們使用Mysql的時(shí)候,也可以采用主從復(fù)制的策略,它的實(shí)現(xiàn)基本和Redis相似,也可以采用增量復(fù)制的方式,Mysql會(huì)在運(yùn)行的過程中,會(huì)記錄二進(jìn)制日志,所有的DML和DDL操作都會(huì)被記錄進(jìn)日志中,主數(shù)據(jù)庫只需要將記錄的操作復(fù)制給從庫,讓從庫也運(yùn)行一次,那么就可以實(shí)現(xiàn)主從復(fù)制,但是注意它不會(huì)在一開始進(jìn)行全量復(fù)制,所以最好在開始主從復(fù)制之前將數(shù)據(jù)庫的內(nèi)容保持一致。
和Redis一樣,一但我們實(shí)現(xiàn)了主從復(fù)制,那么就算主庫出現(xiàn)故障,從庫也能正常提供服務(wù),并且可以實(shí)現(xiàn)讀寫分離等操作,這里我們使用一主一從方式來搭建。?
通過docker 拉取鏡像,通過設(shè)置兩個(gè)不同的端口,啟動(dòng)兩個(gè)mysql。
docker run -p 3346:3306 --name main_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql #-p 設(shè)置端口進(jìn)行端口映射,-e編輯mysql root用戶的密碼,-d表示后太啟動(dòng) docker run -p 3347:3306 --name slave_mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql?5.進(jìn)入容器。
docker exec -it 容器Id或name /bin/bash #再切目錄: cd /etc/mysql。 如果vim指令不存在,說明沒有安裝,我們需要進(jìn)行安裝。 apt-get update apt-get install vim修改主從mysql的配置 my.cnf。
主表中進(jìn)行配置:
#如果是在同一個(gè)服務(wù)器上那就要保證server-id不同,不然會(huì)發(fā)生沖突 server-id=100 #開啟二進(jìn)制日志功能,因?yàn)槭莔ater所以是必要的(名字自己定) log-bin=mysql-bin?從表中進(jìn)行配置:
#設(shè)置server_id,注意要唯一 server-id=101 #開啟二進(jìn)制日志功能,以備Slave作為其它Slave的Master時(shí)使用(子節(jié)點(diǎn)可以作為別人的主節(jié)點(diǎn)) log-bin=mysql-slave-bin #relay_log配置中繼日志 relay_log=edu-mysql-relay-bin??
重啟docker 容器使其配置生效。
docker restart main_mysql進(jìn)入mysql。
mysql mysql -uroot -p123456創(chuàng)建用戶并授權(quán)允許從庫服務(wù)連接主庫的服務(wù)。
#創(chuàng)建一個(gè)用戶 賬號為:slave,密碼為:123456 CREATE USER test IDENTIFIED WITH mysql_native_password BY '123456';#給這個(gè)slave用戶授權(quán),授權(quán)主從復(fù)制權(quán)限和主從復(fù)制的連接GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO test;如果報(bào)以下錯(cuò)誤,說明已經(jīng)存在主從關(guān)系了,我們需要先關(guān)閉主從關(guān)系。
stop slave; reset master;刷新一下mysql權(quán)限。
flush privileges查詢主庫File和Position。
show master status?查看docker 給對應(yīng)的容器分配的ip。
docker inspect 對應(yīng)的容器名?進(jìn)入從數(shù)據(jù)庫,進(jìn)行對指定對應(yīng)的主庫。
#指定ip,端口,和用戶,將對應(yīng)的mysql數(shù)據(jù)庫設(shè)置為該mysql數(shù)據(jù)庫的主節(jié)點(diǎn),我們還需要指定主節(jié)點(diǎn)的二進(jìn)制日志文件和主節(jié)點(diǎn)的偏移量和主節(jié)點(diǎn)的重連次數(shù) change master to master_host='172.17.0.4', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000004', master_log_pos= 861, master_connect_retry=30;#例子二: change master to master_host='172.17.0.2', master_user='test', master_password='123456', master_port=3306, master_log_file='mysql-bin.000003', master_log_pos= 157, master_connect_retry=30;開啟主從復(fù)制功能。
start replica使用對應(yīng)命令?查看從庫的狀態(tài)。
show slave status \G效果圖:
分庫分表?
?在大型的互聯(lián)網(wǎng)系統(tǒng)中,可能單臺(tái)的Mysql已經(jīng)無法滿足業(yè)務(wù)的需求,這時(shí)候就需要進(jìn)行擴(kuò)容了。
單臺(tái)主機(jī)的硬件資源是存在瓶頸的,不可能無限制地?cái)U(kuò)展,這時(shí)候我們就得通過多臺(tái)實(shí)例來進(jìn)行容量的橫向擴(kuò)容,我們可以將數(shù)據(jù)分散儲(chǔ)存,讓多臺(tái)主機(jī)共同來保存數(shù)據(jù)。
擴(kuò)容方法分為有兩種。
1.垂直拆分:我們的表和數(shù)據(jù)庫都可以進(jìn)行垂直拆分的,就是將數(shù)據(jù)庫中所有的表按照業(yè)務(wù)功能進(jìn)行拆分到各個(gè)數(shù)據(jù)庫中(有點(diǎn)類似微服務(wù)),而對于一張表也可以通過外鍵之類的機(jī)制將其拆分為多個(gè)表。
????????
?2.水平拆分:水平拆分針對的不是表,而是數(shù)據(jù),我們可以讓很多個(gè)具有相同表的數(shù)據(jù)庫存放一部分?jǐn)?shù)據(jù),相當(dāng)于是將數(shù)據(jù)分散存儲(chǔ)在各個(gè)節(jié)點(diǎn)上。
那么要實(shí)現(xiàn)這樣的拆分操作,我們自行去編寫代碼的工作量是比較大的,因此目前實(shí)際上已經(jīng)有一些解決方案了,比如我們可以使用MyCat(也就是數(shù)據(jù)庫中間插件,相當(dāng)于掛了一層代理,再通過MyCat進(jìn)行分庫分表操作數(shù)據(jù)庫,只需要連接就可以使用,類似的還有ShardingSphere-Proxy)或是Sharding JDBC(應(yīng)用程序中直接對SQL語句進(jìn)行分析,然后轉(zhuǎn)換成分庫分表操作,需要我們自己編寫一些邏輯代碼)?
?Sharding JDBC?
shardingJDBC官網(wǎng)
?定位為輕量級Java框架,在java的JDBC層提供的額外服務(wù),它使用客戶端直連數(shù)據(jù)庫,以Jar包形式提供服務(wù),無需額外部署和依賴,可以理解為增強(qiáng)版的JDBC驅(qū)動(dòng),完全兼容JDBC和各種ORM框架。
1.適用于如何基于JDBC的ORM框架,如:JPA,Mybatis,Spring JDBC Template或直接使用JDBC。
2.支持任何第三方的數(shù)據(jù)庫連接池,如 DBCP,C3P0,BoneCP,HikariCP。
3.支持任意實(shí)現(xiàn)JDBC規(guī)范的數(shù)據(jù)庫,目前支持Mysql,Oracle,SQLServer以及任何可以使用JDBC的數(shù)據(jù)庫。
水平拆分實(shí)現(xiàn)
導(dǎo)入對應(yīng)依賴。
<!-- shardingJDBC的依賴--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId><version>5.1.1</version></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>?在配置文件中配置數(shù)據(jù)源。
spring:shardingsphere:datasource:#幾個(gè)數(shù)據(jù)就配幾個(gè),這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個(gè)數(shù)據(jù)源單獨(dú)配置db0:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456到目前配置為止,實(shí)際上這些配置都是常規(guī)的操作,在編寫代碼時(shí)關(guān)注點(diǎn)依然放在業(yè)務(wù)上,現(xiàn)在我們就來編寫配置文件,我們需要告訴ShardingJDBC要如何進(jìn)行分片,首先明確:現(xiàn)在就是兩個(gè)數(shù)據(jù)庫都有Test表存放用戶數(shù)據(jù),我們的目標(biāo)是將用戶信息分別存放到這兩個(gè)數(shù)據(jù)庫表中。
?繼續(xù)修改配置文件,設(shè)置其切片模式。
spring:shardingsphere:rules:sharding:tables:#這里填寫表的名稱,程序中對這張表的所有操作都會(huì)采用下面的路由方案#比如我們上面Mybatis就是對test表進(jìn)行操作,所以會(huì)走下面的路由方案test:#這里填寫實(shí)際的路由節(jié)點(diǎn),比如我們要分兩個(gè)庫,那么就可以把兩個(gè)數(shù)據(jù)庫都寫上去,以及對應(yīng)的表#也可以使用表達(dá)式,比如下面的可以簡寫為db$->{0,1}.testactual-data-nodes: db0.test,db1.test#這里是分庫的策略配置database-strategy:#這里選擇標(biāo)準(zhǔn)策略,也可以配置復(fù)雜策略,基于多個(gè)鍵進(jìn)行分片standard:#參與分片運(yùn)算的字段,下面的算法會(huì)根據(jù)這里提供的字段進(jìn)行運(yùn)算sharding-column: id#這里填寫我們下面自定義的算法名稱sharding-algorithm-name: my-algsharding-algorithms:#自定義用戶新的算法,名稱自定義my-alg:#算法類型,官方內(nèi)置了很多種、type: MODprops:sharding-count: 2props:sql-show: true進(jìn)行測試。
我們在mapper中編簡單的插入語句,對應(yīng)其進(jìn)行測試。
在測試類中編寫代碼。
@SpringBootTest class ShardingtestApplicationTests { @AutowiredUserMapper userMapper;@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"xxx", "cccc"));}}}?測試結(jié)果圖:
以為我們自定義的算法是通過取模的結(jié)果來存放到不同的數(shù)據(jù)庫中,在圖中我們可以發(fā)現(xiàn)取模為0時(shí)將數(shù)據(jù)存放到db0中,而取模為1時(shí)就將數(shù)據(jù)存放到db1中。
可以看到所有的SQL語句都有一個(gè)Logic SQL(這個(gè)就是我們在Mybatis里面寫的,是什么就是什么)緊接著下面就是Actual SQL,也就是說每個(gè)邏輯SQL最終會(huì)根據(jù)我們的策略轉(zhuǎn)換為實(shí)際SQL,它的id是0,那么實(shí)際轉(zhuǎn)換出來的SQL會(huì)在db0這個(gè)數(shù)據(jù)源進(jìn)行插入。
我們查看數(shù)據(jù)庫中的信息。
分表查詢和查詢操作
現(xiàn)在我們在我們的數(shù)據(jù)庫中有test_0,test_1兩張表,表結(jié)構(gòu)一樣,但是我們也可以希望能夠根據(jù)id取模運(yùn)算的結(jié)果分別放到這兩個(gè)不同的表中,其實(shí)和分庫差不多。
1.邏輯表:相同結(jié)構(gòu)的水平拆分?jǐn)?shù)據(jù)庫(表)的邏輯名稱,是SQL中表的邏輯標(biāo)識(shí)。如:訂單數(shù)據(jù)根據(jù)主鍵尾數(shù)拆分為10張表,分別是t_order_0到t_order_9?,它們的邏輯表名為t_order。
2.真實(shí)表:在水平分的數(shù)據(jù)庫中真實(shí)存在的物理表,即上個(gè)例子中的t_order_0到t_order_9。
我們創(chuàng)建兩個(gè)跟test表結(jié)構(gòu)相同的test_0,test_1。
修改配置文件改為分表操作。
spring:shardingsphere:rules:sharding:tables:test:actual-data-nodes: db0.test_$->{0..1}#現(xiàn)在我們來配置一下分表策略,注意這里是table-strategy上面是database-strategytable-strategy:#基本都跟之前是一樣的standard:sharding-column: idsharding-algorithm-name: my-algsharding-algorithms:my-alg:#這里我們演示一下INLINE方式,我們可以自行編寫表達(dá)式來決定,自己編寫表達(dá)式type: INLINEprops:#比如我們還是希望進(jìn)行模2計(jì)算得到數(shù)據(jù)該去的表#只需要給一個(gè)最終的表名稱就行了test_,后面的數(shù)字是表達(dá)式取模算出的#實(shí)際上這樣寫和MOD模式一模一樣algorithm-expression: test_$->{id % 2}#沒錯(cuò),查詢也會(huì)根據(jù)分片策略來進(jìn)行,但是如果我們使用的是范圍查詢,那么依然會(huì)進(jìn)行全量查詢#這個(gè)我們后面緊接著會(huì)講,這里先寫上吧allow-range-query-with-inline-sharding: falseprops:sql-show: true插入數(shù)據(jù)效果圖:?
?在id去摸為0時(shí)就插入test_0,取模為1時(shí)就插入test_1。
接下來我們進(jìn)行范圍查詢操作,在Mapper層中編寫范圍查詢。
@Mapper public interface UserMapper {@Select("select * from test where id between #{startId} and #{endId}")List<User> getUserById(int startId, int endId); }在test層做測試。?
@Testvoid contextLoads() {System.out.println(userMapper.getUserById(3,5));}運(yùn)行測試,發(fā)生報(bào)錯(cuò)。
???????
這是因?yàn)槟J(rèn)情況下?allow-range-query-with-inline-sharding配置就是為false,就是不支持范圍查詢,所以我們要將其設(shè)置為true。
重新運(yùn)行的效果圖:
分布式序列算法?
在復(fù)雜分布式系統(tǒng)中,特別是微服務(wù)架構(gòu)中,往往需要大量的數(shù)據(jù)和信息進(jìn)行唯一的標(biāo)識(shí),隨著系統(tǒng)的復(fù)雜,數(shù)據(jù)的增多,分庫分表成為了常見的方案,對數(shù)據(jù)分庫分表后需要有一個(gè)唯一的Id來標(biāo)識(shí)一條數(shù)據(jù)后消息(如訂單號,交易流水,事件編號等),此時(shí)一個(gè)能夠生成全局唯一Id的系統(tǒng)是非常重要的。
我們之前創(chuàng)建過學(xué)生信息表,圖書借閱表,圖書管理表,所有的信息都會(huì)有一個(gè)Id作為主鍵,并且這個(gè)Id有以下要求:
1.為了區(qū)別其他的數(shù)據(jù),這個(gè)Id必須是全局唯一的。
2.主鍵應(yīng)該盡可能的保持有序,這樣會(huì)大大提高索引的查詢效率。
?在我們的分布式系統(tǒng)下有兩種方案來解決此問題
1.使用UUID:UUID是由一組32位數(shù)的16進(jìn)制數(shù)字隨機(jī)生成的,我們可以直接使用JDK為我們提供的UUID類來實(shí)現(xiàn)。
@Testvoid test01() {System.out.println(UUID.randomUUID().toString());}效果圖:
UUID的生成速度非常快,可以看到確實(shí)是能夠保證唯一性,因?yàn)槊慷疾灰粯?#xff0c;而且這樣長一串那重復(fù)的幾率會(huì)非常小。但是它并不滿足我們上面的第二個(gè)要求,也就是說我們需要盡可能的保證有序,而這里我們得到的都是一些無序的Id。
2.雪花算法(Snowflake)
?它會(huì)生成一個(gè)64bit大小的整型的Id,int肯定是裝不下的。
可以看到它主要是三個(gè)部分組成的,時(shí)間+工作機(jī)器Id+序列號,時(shí)間以毫秒為單位,41個(gè)bit位能表示約為70年的時(shí)間,時(shí)間紀(jì)元從2016年11月1日零點(diǎn)開始,可以使用到2086年,工作機(jī)器ID其實(shí)就是節(jié)點(diǎn)Id,每個(gè)節(jié)點(diǎn)的Id都不同,那么就可以區(qū)分出來,10個(gè)bit位可以表示最多1024個(gè)節(jié)點(diǎn),最后12位就是每個(gè)節(jié)點(diǎn)的序列號,因此每臺(tái)機(jī)器每毫秒?就可以有4096個(gè)序列號。
它兼具了上面所說的唯一性和有序性了,但是依然是有缺點(diǎn)的,第一是時(shí)間問題,如果機(jī)器時(shí)間出現(xiàn)倒退,那么就會(huì)導(dǎo)致生成重復(fù)的Id,并且節(jié)點(diǎn)容量只有1024個(gè),如果是超大規(guī)模集群,也是存在隱患的。
?修改數(shù)據(jù)庫的id類型,因?yàn)槭且b下64的數(shù)據(jù),所以我們要為其配置bigint類型。
設(shè)置mybatis的插入操作。
@Update("insert into test(name, password) values(#{name},#{password} )")int addUser2(User user); }?修改配置文件。
spring:shardingsphere:datasource:#幾個(gè)數(shù)據(jù)就配幾個(gè),這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個(gè)數(shù)據(jù)源單獨(dú)配置db0:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:sharding:tables:test:actual-data-nodes: db0.test,db1.test#這里還是使用分庫策略database-strategy:standard:sharding-column: idsharding-algorithm-name: my-alg#這里使用自定義的主鍵生成策略key-generate-strategy:column: idkey-generator-name: my-genkey-generators:#這里寫我們自定義的主鍵生成算法my-gen:#使用雪花算法type: SNOWFLAKEprops:#工作機(jī)器ID,保證唯一就行worker-id: 666sharding-algorithms:my-alg:type: MODprops:sharding-count: 2props:sql-show: true測試類代碼。
@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser2(new User("aaa", "cccc"));}}效果圖:
數(shù)據(jù)庫信息:
?
?如果我們要使用UUID的話,只要在配置文件中將自定義生成主鍵算法的type改為UUID即可。
讀寫分離
?在從表中的配置文件中設(shè)置開啟只讀模式 read-only=1。(如果你是root的話還是可以入數(shù)據(jù)的,而普通用戶就只能讀取了)
?配置好主從關(guān)系。(前講過了)
然后修改配置文件。
spring:shardingsphere:datasource:#幾個(gè)數(shù)據(jù)就配幾個(gè),這里是名稱,格式就是名稱+數(shù)字names: db0,db1#為每個(gè)數(shù)據(jù)源單獨(dú)配置db0:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3344/springcloudusername: rootpassword: 123456db1:#數(shù)據(jù)源實(shí)現(xiàn)類,這里默認(rèn)使用HikariDataSourcetype: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://127.0.0.1:3345/springcloudusername: rootpassword: 123456rules:#配置讀寫分離readwrite-splitting:data-sources:#名稱隨便寫user-db:#使用靜態(tài)類型,動(dòng)態(tài)Dynamic類型可以自動(dòng)發(fā)現(xiàn)auto-aware-data-source-nametype: Staticprops:#配置寫庫(只能一個(gè))write-data-source-name: db0#配置從庫(多個(gè),逗號隔開)read-data-source-names: db1#負(fù)載均衡策略,可以自定義load-balancer-name: my-loadload-balancers:#自定義的負(fù)載均衡策略my-load:type: ROUND_ROBINprops:sql-show: true測試代碼:
@Testvoid contextLoads() {for(int i = 0; i < 10; i++) {userMapper.addUser(new User(i,"aaa", "cccc"));}System.out.println(userMapper.getUserById(3, 5));}?測試效果圖:
?插入語句全部在主節(jié)點(diǎn)數(shù)據(jù)庫中執(zhí)行,而查詢操作都在從節(jié)點(diǎn)數(shù)據(jù)庫中操作。
RabbitMQ(消息隊(duì)列)
我們之前如果需要進(jìn)行遠(yuǎn)程調(diào)用,那么一般可以通過發(fā)送HTTP請求完成,而現(xiàn)在,我們可以使用第二種方式,就是消息隊(duì)列,它能夠?qū)l(fā)送的消息放到隊(duì)列中,當(dāng)新的消息入列時(shí),會(huì)通知接收方進(jìn)行處理,一般消息發(fā)送稱為生產(chǎn)者,接收方稱為消費(fèi)者。
這樣我們所有的請求都可以直接丟到消息隊(duì)列中,再由消費(fèi)者取出,不再是直接連接消費(fèi)者的形式了,而是加了一個(gè)中間商,這也是一種很好的解決方案,并且在高并發(fā)的情況下,消息隊(duì)列也能起到綜合的作用,堆積一部分請求,再由消費(fèi)者來慢慢處理,而不會(huì)像直接調(diào)用那樣請求蜂擁而至。
消息隊(duì)列的具體實(shí)現(xiàn):
在云服務(wù)器上安裝和部署(在docker進(jìn)行)
在docker 中拉去Ribbitmq鏡像。
在docker 中運(yùn)行ribbitmq。
docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq?查看rabbitmq的狀態(tài)。
rabbitmqctl status接著我們還可以將Rabbitmq的管理面板開啟,這樣就可以在瀏覽器上進(jìn)行實(shí)時(shí)訪問和監(jiān)控了。?
rabbitmq-plugins enable rabbitmq_management?開啟面板。
?賬號和密碼都為:guest。
給Rabbitmq設(shè)置新的用戶。
rabbitmqctl add_user 用戶名 密碼給予新的用戶管理員權(quán)限。
rabbitmqctl set_user_tags 用戶名 administrator消息隊(duì)列的基本原理:
????????
生產(chǎn)者(Publisher)和消費(fèi)者(Consumer):不用多說了吧。
Channel:我們的客戶端連接都會(huì)使用一個(gè)Channel,再通過Channel去訪問到RabbitMQ服務(wù)器,注意通信協(xié)議不是http,而是amqp協(xié)議。
Exchange:類似于交換機(jī)一樣的存在,會(huì)根據(jù)我們的請求,轉(zhuǎn)發(fā)給相應(yīng)的消息隊(duì)列,每個(gè)隊(duì)列都可以綁定到Exchange上,這樣Exchange就可以將數(shù)據(jù)轉(zhuǎn)發(fā)給隊(duì)列了,可以存在很多個(gè),不同的Exchange類型可以用于實(shí)現(xiàn)不同消息的模式。
Queue:消息隊(duì)列本體,生產(chǎn)者所有的消息都存放在消息隊(duì)列中,等待消費(fèi)者取出。
Virtual Host:有點(diǎn)類似于環(huán)境隔離,不同環(huán)境都可以單獨(dú)配置一個(gè)Virtual Host,每個(gè)Virtual Host可以包含很多個(gè)Exchange和Queue,每個(gè)Virtual Host相互之間不影響。
?如果出現(xiàn)以下錯(cuò)誤,需要在rabbitmq的配置文件進(jìn)行更改
?修改方式為下:
因?yàn)槭鞘褂胐ocker 容器安裝的,所有需要進(jìn)入容器 docker exec -it rabbitmq /bin/bash進(jìn)入目錄 cd /etc/rabbitmq/conf.d/執(zhí)行命令 echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf退出容器 exit重啟rabbitmq docker restart rabbitmqrabbitmq的使用
?1.最簡單模式:
?(一個(gè)生產(chǎn)者->消息隊(duì)列->一個(gè)消費(fèi)者)
生產(chǎn)者只需要將數(shù)據(jù)丟入消息隊(duì)列,二消費(fèi)者只需要將數(shù)據(jù)從消息隊(duì)列中取出,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的消息交互。
創(chuàng)建測試環(huán)境。
當(dāng)前的用戶就添加了剛剛我們新建的測試環(huán)境。
現(xiàn)在我們來看看交換機(jī)。
?交換機(jī)列表中自動(dòng)為我們新增了剛剛創(chuàng)建好的預(yù)設(shè)交換機(jī),一共7個(gè)。
?
?第一個(gè)交換機(jī)是所有虛擬主機(jī)都會(huì)自帶的一個(gè)默認(rèn)交換機(jī),并且此交換機(jī)不可能刪除,此交換機(jī)默認(rèn)綁定所有的消息隊(duì)列,如果是通過默認(rèn)交換機(jī)發(fā)送消息,那么就會(huì)根據(jù)消息的"rountKey"(類似IP地址)決定發(fā)送給哪個(gè)同名的消息隊(duì)列(是消息隊(duì)列的名稱不是它的routingKey),同時(shí)也不能顯示地將消息隊(duì)列綁定或解綁到此交換機(jī)。
?我們可以看到詳細(xì)信息中,當(dāng)前交換機(jī)特性是持久化的,也就是說就算機(jī)器重啟,那么此交換機(jī)也會(huì)被保留,如果不是持久化,那么一旦重啟就會(huì)消失,實(shí)際上我們在列表中看到D字樣就是表示此交換機(jī)是持久化的,包括消息隊(duì)列也是這樣的,所有自動(dòng)生成的交換機(jī)都是持久化的。
?第二個(gè)交換機(jī)是個(gè)直連的交換機(jī)。
?這個(gè)交換機(jī)和我們剛剛介紹的默認(rèn)交換機(jī)類型是一致的,并且也是持久化的,但是我們可以看到它是具有綁定關(guān)系的,如果沒有指定的消息隊(duì)列綁定到此交換機(jī)上,那么這個(gè)交換機(jī)會(huì)無法正常將信息存放到指定的消息隊(duì)列中,也是根據(jù)對應(yīng)的routingKey尋找消息隊(duì)列(但是可以自定義)
?創(chuàng)建隊(duì)列。
在我創(chuàng)建隊(duì)列的選項(xiàng)中的auto delete的作用是: 需要至少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后,一旦所有與這個(gè)隊(duì)列連接的消費(fèi)斷開時(shí),就會(huì)自動(dòng)刪除此隊(duì)列。
?通過默認(rèn)交換機(jī)綁定我們創(chuàng)建的隊(duì)列并將數(shù)據(jù)傳入消息隊(duì)列中,因?yàn)槲覀兡J(rèn)的交換機(jī)是自動(dòng)綁定的,我們直接傳入數(shù)據(jù)。
?現(xiàn)在在queueTest中就存在一條數(shù)據(jù)了。
?獲取數(shù)據(jù)。
?在獲取數(shù)據(jù)位置有四種消息的處理方式。
1.Nack message requeue true:拒絕消息,也就是說不會(huì)將消息從消息隊(duì)列取出,并且重新排隊(duì),一次可以拒絕多個(gè)消息。
2. Ack message requeue false:確認(rèn)應(yīng)答,確認(rèn)后消息會(huì)從消息隊(duì)列中移除,一次可以確認(rèn)多個(gè)消息。
3.Reject message requeue true/false:也是拒絕此消息,但是可以指定是否重新排隊(duì)。
?而我們的通過綁定直接交換機(jī)也可以達(dá)到將數(shù)據(jù)傳入消息隊(duì)列中并取出的效果,我們在直接交換機(jī)中綁定隊(duì)列并發(fā)送消息到隊(duì)列中。
在對應(yīng)的隊(duì)列中獲取消息。
?刪除隊(duì)列中的所有消息。
?刪除此隊(duì)列。
?使用java操作消息隊(duì)列
導(dǎo)入對應(yīng)依賴。
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>我們來思想一下生產(chǎn)者和消費(fèi)者,首先是生產(chǎn)者,生產(chǎn)者負(fù)責(zé)將信息發(fā)送給消息隊(duì)列。
@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");try {factory.newConnection();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}這里我們可以直接在程序中定義并創(chuàng)建消息隊(duì)列(實(shí)際上是和我們在管理界面創(chuàng)建一樣的效果)客戶端需要通過連接創(chuàng)建一個(gè)新的通道,同一個(gè)連接下可以有很多個(gè)通道,這樣就不用創(chuàng)建很多個(gè)連接也能支持分開發(fā)送。?
@Testvoid contextLoads() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//創(chuàng)建連接try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){ //通過Connection創(chuàng)建新的Channel//聲明隊(duì)列,如果此隊(duì)列不存在,會(huì)自動(dòng)創(chuàng)建channel.queueDeclare("queueTest", false, false, false, null);//將隊(duì)列綁定到交換機(jī)channel.queueBind("queueTest", "amq.direct", "queuekey");//發(fā)布新的消息,注意消息需要轉(zhuǎn)換為byte[]channel.basicPublish("amq.direct", "queuekey", null, "Hello World!".getBytes());}catch (Exception e){e.printStackTrace();}}其中queueDeclare方法的參數(shù)如下:
queue:隊(duì)列的名稱(默認(rèn)創(chuàng)建后routingKey和隊(duì)列名稱一致)
durable:是否持久化。
exclusive:是否排他,如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。排他隊(duì)列是基于Connection可見,同一個(gè)Connection的不同Channel是可以同時(shí)訪問同一個(gè)連接創(chuàng)建的排他隊(duì)列,并且,如果一個(gè)Connection已經(jīng)聲明了一個(gè)排他隊(duì)列,其他的Connection是不允許建立同名的排他隊(duì)列的,即使該隊(duì)列是持久化的,一旦Connection關(guān)閉或者客戶端退出,該排他隊(duì)列都會(huì)自動(dòng)被刪除。
autoDelete:是否自動(dòng)刪除。
arguments:設(shè)置隊(duì)列的其他一些參數(shù),這里我們暫時(shí)不需要什么其他參數(shù)。
其中queueBind方法參數(shù)如下:
queue:需要綁定的隊(duì)列名稱。
exchange:需要綁定的交換機(jī)名稱。
routingKey:不用多說了吧。
其中basicPublish方法的參數(shù)如下:
exchange: 對應(yīng)的Exchange名稱,我們這里就使用第二個(gè)直連交換機(jī)。
routingKey:這里我們填寫綁定時(shí)指定的routingKey,其實(shí)和之前在管理頁面操作一樣。
props:其他的配置。
body:消息本體。
接著我們運(yùn)行測試代碼,并在控制面板中測試。
消費(fèi)者測試。????????
@Testvoid contextLoads() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("test");factory.setPassword("123456");factory.setVirtualHost("/test");//這里不使用try-with-resource,因?yàn)橄M(fèi)者是一直等待新的消息到來,然后按照//我們設(shè)定的邏輯進(jìn)行處理,所以這里不能在定義完成之后就關(guān)閉連接Connection connection = factory.newConnection();Channel channel = connection.createChannel();//創(chuàng)建一個(gè)基本的消費(fèi)者channel.basicConsume("queueTest", false, (s, delivery) -> {//delivery里面是消息的一些內(nèi)容System.out.println(new String(delivery.getBody()));//basicAck是確認(rèn)應(yīng)答,第一個(gè)參數(shù)是當(dāng)前的消息標(biāo)簽,第二個(gè)的參數(shù)表示//是否批量處理消息隊(duì)列中所有的消息,如果為false表示只處理當(dāng)前消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//basicNack是拒絕應(yīng)答,最后一個(gè)參數(shù)表示是否將當(dāng)前消息放回隊(duì)列,如果//為false,那么消息就會(huì)被丟棄//channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);//跟上面一樣,最后一個(gè)參數(shù)為false,只不過這里省了//channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);}, s -> {});其中basicConsume方法參數(shù)如下:
queue - 消息隊(duì)列名稱,直接指定。
autoAck - 自動(dòng)應(yīng)答,消費(fèi)者從消息隊(duì)列取出數(shù)據(jù)后,需要跟服務(wù)器進(jìn)行確認(rèn)應(yīng)答,當(dāng)服務(wù)器收到確認(rèn)后,會(huì)自動(dòng)將消息刪除,如果開啟自動(dòng)應(yīng)答,那么消息發(fā)出后會(huì)直接刪除。
deliver - 消息接收后的函數(shù)回調(diào),我們可以在回調(diào)中對消息進(jìn)行處理,處理完成后,需要給服務(wù)器確認(rèn)應(yīng)答。
cancel - 當(dāng)消費(fèi)者取消訂閱時(shí)進(jìn)行的函數(shù)回調(diào),這里暫時(shí)用不到。
在springBoot整合消息隊(duì)列?
導(dǎo)入對應(yīng)依賴。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>?在配置文件中修改配置。
spring:rabbitmq:addresses: 127.0.0.1username: testpassword: 123456virtual-host: /test我們創(chuàng)建一個(gè)配置類。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfiguration {@Bean("directExchange") //定義交換機(jī)Bean,可以很多個(gè)public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest") //定義消息隊(duì)列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.build();}@Bean("binding")public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){//將我們剛剛定義的交換機(jī)和隊(duì)列進(jìn)行綁定return BindingBuilder.bind(queue) //綁定隊(duì)列.to(exchange) //到交換機(jī).with("queuekey") //使用自定義的routingKey.noargs();} }?接下來我們來創(chuàng)建一個(gè)生產(chǎn)者,這里我們直接編寫在測試用例中:
//RabbitTemplate為我們封裝了大量的RabbitMQ操作,已經(jīng)由Starter提供,因此直接注入使用即可@ResourceRabbitTemplate template;@Testvoid publisher() {//使用convertAndSend方法一步到位,參數(shù)基本和之前是一樣的//最后一個(gè)消息本體可以是Object類型,真是大大的方便template.convertAndSend("amq.direct", "queueTest", "Hello World!");}創(chuàng)建消費(fèi)者,創(chuàng)建監(jiān)聽器。
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊(duì)列queueTest的監(jiān)聽器,一旦監(jiān)聽到新的消息,就會(huì)接受并處理public void test(Message message){ //如果我們Message的類型改為String類型也是可以的,它會(huì)自動(dòng)將我們的數(shù)據(jù)轉(zhuǎn)換為String類型System.out.println(new String(message.getBody()));} }效果圖:
我們可以往消息隊(duì)列中提交數(shù)據(jù),然后等待消費(fèi)者返回信息。
@Testvoid publisher() {//使用convertAndSend方法一步到位,參數(shù)基本和之前是一樣的//最后一個(gè)消息本體可以是Object類型,真是大大的方便Object queuekey = template.convertSendAndReceive("amq.direct", "queuekey", "Hello World!");System.out.println("收到消費(fèi)者的響應(yīng)");}消費(fèi)者的響應(yīng)方式,就是通過配置監(jiān)聽器的監(jiān)聽方法的返回值來實(shí)現(xiàn)。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest") //定義此方法為隊(duì)列queueTest的監(jiān)聽器,一旦監(jiān)聽到新的消息,就會(huì)接受并處理public String test(String message){System.out.println(message);return "消費(fèi)者已經(jīng)做出響應(yīng)";} }進(jìn)行測試。
消息隊(duì)列處理json數(shù)據(jù)?
如果需要傳入的數(shù)據(jù)為json類型時(shí)我們該怎么做呢?
在rabbitmq中配置類中將json數(shù)據(jù)轉(zhuǎn)換器注入spring中。
@Configuration public class RabbitConfiguration {@Bean("jacksonConverter") //直接創(chuàng)建一個(gè)用于JSON轉(zhuǎn)換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}在消費(fèi)者類中指定消息轉(zhuǎn)換器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }進(jìn)行測試。
因?yàn)槲覀冊趕pring注入json轉(zhuǎn)換器,所以我們可以在測試類中直接給傳入數(shù)據(jù)實(shí)現(xiàn)類,其會(huì)自動(dòng)將我們的實(shí)現(xiàn)類轉(zhuǎn)換為json類型。
實(shí)現(xiàn)類:
@Data public class User {int id;String name; }發(fā)送消息到消息隊(duì)列中的測試類。
@Testvoid publisher() {template.convertAndSend("amq.direct", "queuekey",new User());}測試結(jié)果圖:
?死信隊(duì)列
?消息隊(duì)列中的數(shù)據(jù),如果遲遲沒有消費(fèi)者來處理,那么就會(huì)一直占用消息隊(duì)列的空間,比如我們模擬一下?lián)屲嚻钡膱鼍?#xff0c;用戶下單高鐵之后,會(huì)進(jìn)行搶座,然后再進(jìn)行付款,但是如果用戶下單之后并沒有及時(shí)的付款,這張票不可能一直讓該用戶占用著,因?yàn)槟悴毁I別人還要買呢,所以會(huì)在一段時(shí)間后超時(shí),讓這張票可以繼續(xù)被其他人購買。
這時(shí),我們就可以使用死信隊(duì)列,將那些用戶超時(shí)未付款的或是用戶主動(dòng)取消的訂單,進(jìn)行進(jìn)一步的處理,以下類型的消息都會(huì)被判定為死信。
1.消息被拒絕(basic.reject/basic.nack),并且requeue = false。
2.消息TTL過期。
3.隊(duì)列達(dá)到最大值。
?那么如何構(gòu)建這樣的一種使用模式呢?實(shí)際上本質(zhì)上就是一個(gè)死信交換機(jī)+綁定的死信隊(duì)列,當(dāng)正常隊(duì)列中的消息被判定為死信時(shí),會(huì)被發(fā)送到對應(yīng)的死信交換機(jī)中,死信隊(duì)列也有對應(yīng)的消費(fèi)者去處理消息。
這里我們直接在消息隊(duì)列配置類中創(chuàng)建一個(gè)新的死信隊(duì)列,并對其進(jìn)行綁定。
@Bean("directDlExchange")public Exchange dlExchange(){//創(chuàng)建一個(gè)新的死信交換機(jī)//在這里做配置的話,即使在交換機(jī)中沒有該交換機(jī),其也會(huì)自動(dòng)被創(chuàng)建return ExchangeBuilder.directExchange("dlx.direct").build();}@Bean("testDlQueue") //創(chuàng)建一個(gè)新的死信隊(duì)列public Queue dlQueue(){return QueueBuilder.nonDurable("testDl").build();}@Bean("dlBinding") //死信交換機(jī)和死信隊(duì)列進(jìn)綁定public Binding dlBinding(@Qualifier("directDlExchange") Exchange exchange,@Qualifier("testDlQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("queueDlKey").noargs();}?再將該隊(duì)列綁定為其他隊(duì)列的死信隊(duì)列。
@Bean("queueTest") //定義消息隊(duì)列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").build();}修改消費(fèi)者類的配置信息,將今天的隊(duì)列改為死信隊(duì)列,當(dāng)死信隊(duì)列有消息時(shí)就進(jìn)行消費(fèi)。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "testDl", messageConverter = "jacksonConverter")public void receiver(User user){System.out.println(user);} }測試情況一
當(dāng)前消息隊(duì)列中存在一個(gè)消息,我們對獲取其消息并將其在放到消息隊(duì)列中,讓死信隊(duì)列處理。
可以發(fā)現(xiàn)死信隊(duì)列處理了這個(gè)消息。?
?測試情況二
在rabbitmq的配置類中進(jìn)行配置,配置隊(duì)列中消息的生命周期。
@Bean("queueTest") //定義消息隊(duì)列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").ttl(500).build();}可以發(fā)現(xiàn)對消息隊(duì)列發(fā)送的消息在0.5秒后就被死信隊(duì)列消費(fèi)了。?
?測試第三中情況
在rabbitmq的配置類中對消息隊(duì)列的長度進(jìn)行配置。?
@Bean("queueTest") //定義消息隊(duì)列public Queue queue(){return QueueBuilder.nonDurable("queueTest") //非持久化類型.deadLetterExchange("dlx.direct").deadLetterRoutingKey("queueDlKey").maxLength(3)//設(shè)置消息隊(duì)列的長度.build();}往消息隊(duì)列中添加四組數(shù)據(jù),我們可以發(fā)現(xiàn)第一組數(shù)據(jù)被死信隊(duì)列消費(fèi)了。
?工作隊(duì)列模式
?實(shí)際上這種模式就非常適合多個(gè)工人等待新的任務(wù)到來的場景,我們的任務(wù)有很多個(gè),一個(gè)一個(gè)丟到消息隊(duì)列中,而此時(shí)個(gè)人有很多個(gè),那么我們就可以將這些任務(wù)分配給各個(gè)工人,讓他們各自負(fù)責(zé)一些任務(wù),并且做的快的工人還可以多完成一些。(能者多勞)
我們只需要?jiǎng)?chuàng)建多個(gè)監(jiān)聽器即可。(這里我們就先創(chuàng)建兩個(gè)監(jiān)聽器)
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println(user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println(user);} }在隊(duì)列中添加多個(gè)消息,觀察消費(fèi)者的處理情況。
可以發(fā)現(xiàn)消費(fèi)者采用的是輪番的策略,進(jìn)行消息消費(fèi)的。
默認(rèn)情況下,一個(gè)消費(fèi)者可以同時(shí)處理250個(gè)消息,我們也可以對其進(jìn)行修改。
?在rabbitmq的配置類中條件監(jiān)聽器創(chuàng)建者工廠。
@Resourceprivate CachingConnectionFactory connectionFactory;@Bean(name = "listenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPrefetchCount(1); //將PrefetchCount設(shè)定為1表示一次只能取一個(gè)return factory;}?在消費(fèi)者類中設(shè)置對應(yīng)的監(jiān)聽器生產(chǎn)者。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver1(User user){System.out.println("消費(fèi)者1處理消息" + user);}@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", containerFactory = "listenerContainer")public void receiver2(User user){System.out.println("消費(fèi)者2處理消息" + user);} }進(jìn)行測試。
當(dāng)我們想同時(shí)創(chuàng)建多個(gè)功能相同的消費(fèi)者時(shí),我們只要進(jìn)行下列配置即可。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest", messageConverter = "jacksonConverter", concurrency = "count")public void receiver1(User user){System.out.println("消費(fèi)者1處理消息" + user);}}測試效果圖:
發(fā)布訂閱模式
比如我們在購買了云服務(wù)器,但是最近快到期了,那么就會(huì)給你的手機(jī)和郵箱發(fā)送消息,告訴你需要續(xù)費(fèi)了,但是手機(jī)短信和郵箱發(fā)送?并不是同一個(gè)業(yè)務(wù)提供的,但是現(xiàn)在我們又希望能夠都去執(zhí)行,所以就可以用到發(fā)布訂閱模式,簡而言之就是,發(fā)布一次,消費(fèi)多個(gè)。
因?yàn)槲覀冎笆褂玫氖侵边B交換機(jī),是一對一的關(guān)系,肯定是不行的,我們這里需要用到另一種類型的交換機(jī),叫做fanout(扇出)類型,這是一種廣播類型,消息會(huì)被廣播到所有與此交換機(jī)綁定的消息隊(duì)列中。
?在rabbitmq配置類中進(jìn)行配置,創(chuàng)建多個(gè)隊(duì)列,并將這些對應(yīng)的隊(duì)列綁定到扇出交換機(jī)上。
@Configuration public class RabbitConfiguration {@Resourceprivate CachingConnectionFactory connectionFactory;@Bean("fanoutExchange")public Exchange exchange(){//注意這里是fanoutExchangereturn ExchangeBuilder.fanoutExchange("amq.fanout").build();}@Bean("queueTest1") //定義消息隊(duì)列public Queue queue1(){return QueueBuilder.nonDurable("queueTest1") //非持久化類型.build();}@Bean("queueTest2") //定義消息隊(duì)列public Queue queue2(){return QueueBuilder.nonDurable("queueTest2") //非持久化類型.build();}@Bean("binding1")public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest1") Queue queue){//將我們剛剛定義的交換機(jī)和隊(duì)列進(jìn)行綁定return BindingBuilder.bind(queue) //綁定隊(duì)列.to(exchange) //到交換機(jī).with("queuekey1") //使用自定義的routingKey.noargs();}@Bean("binding2")public Binding binding2(@Qualifier("fanoutExchange") Exchange exchange,@Qualifier("queueTest2") Queue queue){//將我們剛剛定義的交換機(jī)和隊(duì)列進(jìn)行綁定return BindingBuilder.bind(queue) //綁定隊(duì)列.to(exchange) //到交換機(jī).with("queuekey2") //使用自定義的routingKey.noargs();}@Bean("jacksonConverter") //直接創(chuàng)建一個(gè)用于JSON轉(zhuǎn)換的Beanpublic Jackson2JsonMessageConverter converter(){return new Jackson2JsonMessageConverter();}}修改監(jiān)聽器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest1", messageConverter = "jacksonConverter")public void receiver1(User user){System.out.println("隊(duì)列一接收到消息" + user);}@RabbitListener(queues = "queueTest2", messageConverter = "jacksonConverter")public void receiver2(User user){System.out.println("隊(duì)列二接收到消息" + user);} }?測試結(jié)果圖:
在對應(yīng)的交換機(jī)中沒有指定routingKey時(shí)發(fā)送數(shù)據(jù),兩個(gè)隊(duì)列都會(huì)收到消息。
?路由模式
?我們可以在綁定時(shí)指定想要的routingKey只有生產(chǎn)者發(fā)送時(shí)指定了對應(yīng)的routingKey才能到達(dá)對應(yīng)的隊(duì)列。
?當(dāng)然除了我們之前的一次綁定之外,同一個(gè)消息隊(duì)列可以多次綁定到交換機(jī),并且使用不同的routingKey,這樣只要滿足其中一個(gè)都可以被發(fā)送到此消息隊(duì)列中。
在rabbitmq的配置類中進(jìn)行配置。
@Configuration public class RabbitConfiguration {@Bean("directExchange")public Exchange exchange(){return ExchangeBuilder.directExchange("amq.direct").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding") //使用yyds1綁定public Binding binding(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key1").noargs();}@Bean("binding2") //使用yyds2綁定public Binding binding2(@Qualifier("directExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("key2").noargs();} }?修改監(jiān)聽器。
@Component //注冊為Bean public class TestListener {@RabbitListener(queues = "queueTest")public void receiver1(String message) {System.out.println("隊(duì)列一接收到消息" + message);} }我們在交換機(jī)中添加兩條消息,分別通過不同的routingKey。
進(jìn)行測試,通過不同的routingkey進(jìn)入了同一個(gè)消息隊(duì)列中?。
主題模式
?實(shí)際上這種模式就是一種模糊匹配模式,我們可以將routingKey以模糊匹配的方式去進(jìn)行轉(zhuǎn)發(fā)。?
?我們可以使用*或#來表示:
1.*:表示容易的一個(gè)單詞。
2.#:表示0個(gè)或多個(gè)單詞。
?修改rabbitmq的配置類。
@Configuration public class RabbitConfiguration {@Bean("topicExchange") //這里使用預(yù)置的Topic類型交換機(jī)public Exchange exchange(){return ExchangeBuilder.topicExchange("amq.topic").build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("topicExchange") Exchange exchange,@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("*.test.*").noargs();} }在預(yù)設(shè)的topic交換機(jī)中以a.test.c?作為routingKey將數(shù)據(jù)傳入對應(yīng)的消息隊(duì)列中。
?消費(fèi)者去消費(fèi)了此消息。
"#"也是差不多的效果。
?除了我們這里使用的默認(rèn)主題交換機(jī)之外,還有一個(gè)叫做amq.rabbitmq.trace的交換機(jī)。
?可以看到它也是topic類型的,那么這個(gè)交換機(jī)是做什么的呢?實(shí)際上這個(gè)是用于幫助我們記錄和追蹤生產(chǎn)者和消費(fèi)者使用消息隊(duì)列的交換機(jī),它是一個(gè)內(nèi)部的交換機(jī)。
接著我們需要在rabbitmq主機(jī)中將/test的追蹤功能開啟。
rabbitmqctl trace_on -p /test?創(chuàng)建新的消息隊(duì)列。
?將消息隊(duì)列綁定到amq.rabbitmq.trace交換機(jī)上,要將生產(chǎn)者輸入的交換機(jī)和消費(fèi)者獲取數(shù)據(jù)的隊(duì)列全部存放到剛剛那個(gè)trace消息隊(duì)列中。?
?我們獲取trace中的消息,會(huì)得到一個(gè)交換機(jī)和消息隊(duì)列。
?第四種交換機(jī)類型
?第四種交換機(jī)類型header,它是根據(jù)頭部消息來決定的,在我們發(fā)送的消息中是可以攜帶一些頭部消息的(類似于HTTP),我們可以根據(jù)這些頭部信息來決定路由哪個(gè)消息隊(duì)列中。
???????修改rabbitmq的配置類。
@Configuration public class RabbitConfiguration {@Bean("headerExchange") //注意這里返回的是HeadersExchangepublic HeadersExchange exchange(){return ExchangeBuilder.headersExchange("amq.headers") //RabbitMQ為我們預(yù)置了兩個(gè),這里用第一個(gè)就行.build();}@Bean("queueTest")public Queue queue(){return QueueBuilder.nonDurable("queueTest").build();}@Bean("binding")public Binding binding2(@Qualifier("headerExchange") HeadersExchange exchange, //這里和上面一樣的類型@Qualifier("queueTest") Queue queue){return BindingBuilder.bind(queue).to(exchange) //使用HeadersExchange的to方法,可以進(jìn)行進(jìn)一步配置//.whereAny("a", "b").exist(); 這個(gè)是只要存在任意一個(gè)指定的頭部Key就行//.whereAll("a", "b").exist(); 這個(gè)是必須存在所有指定的的頭部Key.where("test").matches("hello"); //比如我們現(xiàn)在需要消息的頭部信息中包含test,并且值為hello才能轉(zhuǎn)發(fā)給我們的消息隊(duì)列//.whereAny(Collections.singletonMap("test", "hello")).match(); 傳入Map也行,批量指定鍵值對} }將數(shù)據(jù)傳入amq.header交換機(jī),并設(shè)置頭部信息,進(jìn)行測試。
查看queueTest隊(duì)列的消息,可以發(fā)現(xiàn)消息隊(duì)列中的消息被消費(fèi)了。
docker 搭建rabbitmq集群?
下載rabbitmq的管理者版本。
docker pull rabbitmq:3.9.5-management創(chuàng)建三個(gè)rabbitmq。(如果需要查看各自的控制面板的話,我們只需要為每個(gè)rabbitmq綁定15672端口)
docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit2 --name rabbit2 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management docker run -d --hostname myRabbit3 --name rabbit3 -p 15673:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.5-management-e RABBITMQ_ERLANG_COOKIE=‘rabbitcookie’ : 必須設(shè)置為相同,因?yàn)?Erlang節(jié)點(diǎn)間是通過認(rèn)證Erlang cookie的方式來允許互相通信的。
–link rabbit1:myRabbit1 --link rabbit2:myRabbit2: 不要漏掉,否則會(huì) 一直處在 Cluster status of node rabbit@myRabbit3 … 沒有反應(yīng)。
啟動(dòng)完成之后,使用docker ps命令查看運(yùn)行情況,確保RabbitMQ都已經(jīng)啟動(dòng)。
將RabbitMQ節(jié)點(diǎn)加入到集群。
#進(jìn)入rabbitmq02容器,重新初始化一下,將02節(jié)點(diǎn)加入到集群中 docker exec -it rabbit2 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 #參數(shù)“--ram”表示設(shè)置為內(nèi)存節(jié)點(diǎn),忽略該參數(shù)默認(rèn)為磁盤節(jié)點(diǎn),@后面的為ip名,以為我們在啟動(dòng)rabbitmq時(shí)給其ip設(shè)置了新的名字,且我們以一節(jié)點(diǎn)作為主節(jié)點(diǎn)其他作為從節(jié)點(diǎn)。 rabbitmqctl start_app exit#進(jìn)入rabbitmq03容器,重新初始化一下,將03節(jié)點(diǎn)加入到集群中 docker exec -it rabbit3 bash rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster --ram rabbit@myRabbit1 rabbitmqctl start_app exit內(nèi)存節(jié)點(diǎn)將所有的隊(duì)列、交換器、綁定、用戶等元數(shù)據(jù)定義都存儲(chǔ)在內(nèi)存中;而磁盤節(jié)點(diǎn)將元數(shù)據(jù)存儲(chǔ)在磁盤中。單節(jié)點(diǎn)系統(tǒng)只允許磁盤類型的節(jié)點(diǎn),否則當(dāng)節(jié)點(diǎn)重啟以后,所有的配置信息都會(huì)丟失。如果采用集群的方式,可以選擇至少配置一個(gè)節(jié)點(diǎn)為磁盤節(jié)點(diǎn),其余部分配置為內(nèi)存節(jié)點(diǎn),這樣可以獲得更快的響應(yīng)。所以本集群中配置節(jié)點(diǎn)一位磁盤節(jié)點(diǎn),節(jié)點(diǎn)二和節(jié)點(diǎn)三位內(nèi)存節(jié)點(diǎn)。
?此時(shí)我只是完成了簡單的集群,接下來我們還要配置鏡像隊(duì)列(類型主從復(fù)制),我們這里在終端中配置,其實(shí)也可以直接在控制面板中在admin中配置對應(yīng)的策略。
#隨便進(jìn)入一個(gè)容器 docker exec -it rabbit1 bash#設(shè)置策略匹配所有名稱的隊(duì)列都進(jìn)行高可用配置,且實(shí)現(xiàn)自動(dòng)同步。 rabbitmqctl set_policy -p / ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'#查詢策略 rabbitmqctl list_policies -p / #查看vhost下的所有的策略(policies )當(dāng)主節(jié)點(diǎn)中的隊(duì)列down后從節(jié)點(diǎn)中的隊(duì)列就會(huì)被使用,且在主節(jié)點(diǎn)中的隊(duì)列恢復(fù)以后,其會(huì)變成從隊(duì)列來繼續(xù)使用。
1.策略名稱,我們命名為ha(高可用);
2.-p / 設(shè)置vhost,可以使用rabbitmqctl list_policies -p / 查看該vhost 下所有的策略(policies )。
3.隊(duì)列名稱的匹配規(guī)則,使用正則表達(dá)式表示;
4.為鏡像隊(duì)列的主體規(guī)則,是json字符串,分為三個(gè)屬性:ha-mode | ha-params | ha-sync-mode,分別的解釋如下:
????????ha-mode:鏡像模式,分類:all/exactly/nodes,all存儲(chǔ)在所有節(jié)點(diǎn);exactly存儲(chǔ)x個(gè)節(jié)點(diǎn),節(jié)點(diǎn)的個(gè)數(shù)由ha-params指定;nodes指定存儲(chǔ)的節(jié)點(diǎn)上名稱,通過ha-params指定;
????????ha-params:作為參數(shù),為ha-mode的補(bǔ)充;
????????ha-sync-mode:鏡像消息同步方式:automatic(自動(dòng)),manually(手動(dòng));
?消息隊(duì)列中間件
?由于使用不同的消息隊(duì)列,我們不能保證系統(tǒng)相同,為了注重邏輯,springCloud Stream它能夠屏蔽底層實(shí)現(xiàn),我們使用統(tǒng)一的消息隊(duì)列操作方式就能操作多種不同類型的消息隊(duì)列。
它屏蔽了Rabbitmq底層操作,讓我們使用統(tǒng)一 的Input和Output形式。以Binder為中間件,這樣我們就算切換了不同的消息隊(duì)列,也無需修改代碼,而具體某種消息隊(duì)列的底層實(shí)現(xiàn)是交給Stream在做的。?
創(chuàng)建兩個(gè)模塊,一個(gè)是生產(chǎn)者一個(gè)是消費(fèi)者。
?導(dǎo)入對應(yīng)依賴。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>3.2.4</version></dependency>在生產(chǎn)者的配置文件中進(jìn)行配置。
server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務(wù)的配置信息cloud-server: #綁定的名稱,自定義一個(gè)就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務(wù)器相關(guān)信息,以為是自定義的名稱,所以下面會(huì)爆紅,不影響運(yùn)行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-out-0: #自定義的綁定名稱destination: test.exchange #目的地,就是交換機(jī)的名稱。如果不存在就會(huì)創(chuàng)建在生產(chǎn)者的controller層編寫發(fā)送消息的controller。
@RestController public class PublisherController {@ResourceStreamBridge streamBridge;public String publish() {//第一個(gè)次數(shù)其實(shí)就是Rabbitmq的交換機(jī)名稱//這個(gè)交換機(jī)的名稱有些規(guī)則//輸入: <名稱>-in-<index>//輸出: <名稱>-out-<index>//這里我們使用輸出方式,來將數(shù)據(jù)發(fā)送到消息隊(duì)列,注意這里的名稱會(huì)和之后的消費(fèi)者Bean名稱進(jìn)行對應(yīng)streamBridge.send("test-out-0", "hello world");return "發(fā)送成功"+new Date();} }編寫消費(fèi)者配置文件。
server:port: 8001 spring:cloud:stream:binders: #此處配置要綁定的rabbitmq服務(wù)的配置信息cloud-server: #綁定的名稱,自定義一個(gè)就行type: rabbit #消息主件類型,這里使用rabbit,所以這粒就填寫rabbitenvironment: #服務(wù)器相關(guān)信息,以為是自定義的名稱,所以下面會(huì)爆紅,不影響運(yùn)行spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /bindings:test-in-0:destination: test.exchange創(chuàng)建一個(gè)類用于做消費(fèi)者的配置。
@Component public class TestConsumer {@Bean("test")//這里的注入名要和剛剛生產(chǎn)者的綁定名稱中的名稱相同public Consumer<String> consumer() {return System.out::println;} }啟動(dòng)測試。
?其自動(dòng)幫我們創(chuàng)建了對應(yīng)的消息隊(duì)列。
?消息發(fā)送以后,消費(fèi)者去去消費(fèi)了這個(gè)消息。
這樣我們就通過springCloud Stream屏蔽底層Rabbitmq來直接進(jìn)行消息的操作了。??
總結(jié)
以上是生活随笔為你收集整理的springCloud 初探的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java泛型类的构造函数_Java泛型构
- 下一篇: 使用OFFSET函数完成二级城市菜单