通过实例理解 RabbitMQ 的基本概念
先說(shuō)下自己開(kāi)發(fā)的實(shí)例。
最近在使用 Spring Cloud Config 做分布式配置中心(基于 SVN/Git),當(dāng)所有服務(wù)啟動(dòng)后,SVN/Git 中的配置文件更改后,客戶(hù)端服務(wù)讀取的還是舊的配置,并不能實(shí)時(shí)讀取(配置信息會(huì)緩存在客戶(hù)端),Spring Boot 提供了一種方式進(jìn)行更新(通過(guò)spring-boot-starter-actuator監(jiān)控模塊),然后 Post 訪(fǎng)問(wèn)客戶(hù)端服務(wù)的/refresh接口(也可以命令執(zhí)行curl -X POST http://worker2:8115/refresh),這樣客戶(hù)端會(huì)重新從配置中心獲取新的配置信息,請(qǐng)求命令可以寫(xiě)在 Git 的 Webhooks 腳本中(修改提交 Push 后執(zhí)行)。
如果客戶(hù)端服務(wù)比較少的話(huà),這樣的解決方式?jīng)]問(wèn)題,如果客戶(hù)端服務(wù)多的話(huà),執(zhí)行的請(qǐng)求腳本就會(huì)非常多,而且單個(gè)服務(wù)的解決方式,也不利于后期的維護(hù)(點(diǎn)對(duì)點(diǎn)的方式),那該怎么解決上面的問(wèn)題呢?答案就是通過(guò) Spring Cloud Bus。
Spring Cloud Bus 翻譯為消息總線(xiàn),使用輕量級(jí)的消息代理來(lái)構(gòu)建一個(gè)共用的消息主題讓系統(tǒng)中所有微服務(wù)實(shí)例都能連接上來(lái),由于該主題中產(chǎn)生的消息會(huì)被所有實(shí)例監(jiān)聽(tīng)和消費(fèi),所以我們稱(chēng)它為消息總線(xiàn)。在總線(xiàn)上的各個(gè)實(shí)例都可以方便地廣播一些需要讓其他連接在該主題上的實(shí)例都知道的消息,例如配置信息的變更或者其他一些管理操作等。
架構(gòu)示意圖(引用來(lái)源):
下面,我們需要利用 Spring Cloud Bus 來(lái)改造 Spring Cloud Config 的服務(wù)端和客戶(hù)端,其實(shí)非常簡(jiǎn)單。
添加下面的依賴(lài):
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId> </dependency>然后在bootstrap.yml中添加下面配置:
spring:rabbitmq:host: manager1port: 5672username: adminpassword: admin123 management:security:enabled: false上面的配置信息都是新增的,并且都需要配置在服務(wù)端和客戶(hù)端,通過(guò)上面的示例圖可以看到,配置信息更新后請(qǐng)求的是服務(wù)端,那么客戶(hù)端我們就不需要配置management.security.enabled(也不需要配置spring-boot-starter-actuator監(jiān)控模塊)。
服務(wù)端和客戶(hù)端的任何 Java 代碼都不需要編寫(xiě),重新啟動(dòng)服務(wù),當(dāng)配置信息更新后,通過(guò) Git 的 Webhooks 執(zhí)行請(qǐng)求腳本:curl -X POST http://manager1:port/bus/refresh,服務(wù)端接受到請(qǐng)求之后,會(huì)通過(guò) Spring Cloud Bus 通知所有的客戶(hù)端(通過(guò) RabbitMQ),重新從配置中心獲取配置信息,達(dá)到實(shí)時(shí)更新配置的目的。
上面的實(shí)例描述就到這里。
RabbitMQ 的基本概念
RabbitMQ,是一個(gè)使用 erlang 編寫(xiě)的 AMQP(高級(jí)消息隊(duì)列協(xié)議)的服務(wù)實(shí)現(xiàn),簡(jiǎn)單來(lái)說(shuō),就是一個(gè)功能強(qiáng)大的消息隊(duì)列服務(wù)。
RabbitMQ 最基本模型:
RabbitMQ 的基本概念:
- Producer:消息生產(chǎn)者。
- Consumer:消息消費(fèi)者。
- Connection(連接):Producer 和 Consumer 通過(guò)TCP 連接到 RabbitMQ Server。
- Channel(信道):基于 Connection 創(chuàng)建,數(shù)據(jù)流動(dòng)都是在 Channel 中進(jìn)行。
- Exchange(交換器):生產(chǎn)者將消息發(fā)送到 Exchange(交換器),由 Exchange 將消息路由到一個(gè)或多個(gè) Queue 中(或者丟棄);Exchange 并不存儲(chǔ)消息;Exchange Types 常用有 Fanout、Direct、Topic 三種類(lèi)型,每種類(lèi)型對(duì)應(yīng)不同的路由規(guī)則。
- Queue(隊(duì)列):是 RabbitMQ 的內(nèi)部對(duì)象,用于存儲(chǔ)消息;消息消費(fèi)者就是通過(guò)訂閱隊(duì)列來(lái)獲取消息的,RabbitMQ 中的消息都只能存儲(chǔ)在 Queue 中,生產(chǎn)者生產(chǎn)消息并最終投遞到 Queue 中,消費(fèi)者可以從 Queue 中獲取消息并消費(fèi);多個(gè)消費(fèi)者可以訂閱同一個(gè) Queue,這時(shí) Queue 中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理。
- Binding(綁定):是 Exchange(交換器)將消息路由給 Queue 所需遵循的規(guī)則。
- Routing Key(路由鍵):消息發(fā)送給 Exchange(交換器)時(shí),消息將擁有一個(gè)路由鍵(默認(rèn)為空), Exchange(交換器)根據(jù)這個(gè)路由鍵將消息發(fā)送到匹配的隊(duì)列中。
- Binding Key(綁定鍵):指定當(dāng)前 Exchange(交換器)下,什么樣的 Routing Key(路由鍵)會(huì)被下派到當(dāng)前綁定的 Queue 中。
另外,再說(shuō)下 Exchange Types(交換器類(lèi)型)的三種常用類(lèi)型:
- Direct:完全匹配,消息路由到那些 Routing Key 與 Binding Key 完全匹配的 Queue 中。比如 Routing Key 為cleint-key,只會(huì)轉(zhuǎn)發(fā)cleint-key,不會(huì)轉(zhuǎn)發(fā)cleint-key.1,也不會(huì)轉(zhuǎn)發(fā)cleint-key.1.2。
- Topic:模式匹配,Exchange 會(huì)把消息發(fā)送到一個(gè)或者多個(gè)滿(mǎn)足通配符規(guī)則的 routing-key 的 Queue。其中*表號(hào)匹配一個(gè) word,#匹配多個(gè) word 和路徑,路徑之間通過(guò).隔開(kāi)。如滿(mǎn)足a.*.c的 routing-key 有a.hello.c;滿(mǎn)足#.hello的 routing-key 有a.b.c.helo。
- Fanout:忽略匹配,把所有發(fā)送到該 Exchange 的消息路由到所有與它綁定 的Queue 中。
下面通過(guò)一段代碼,理解一下消息發(fā)布的流程(代碼引用):
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()channel.exchange_declare(exchange='first', type='topic') channel.queue_declare(queue='A') channel.queue_declare(queue='B')channel.queue_bind(exchange='first', queue='A', routing_key='a.*.*') channel.queue_bind(exchange='first', queue='B', routing_key='a.#')channel.basic_publish(exchange='first',routing_key='a',body='Hello World!')channel.basic_publish(exchange='first',routing_key='a.b.c',body='Hello World!')大致步驟:
- 先獲取一個(gè) Connection(連接)。
- 從 Connection(連接)上獲取一個(gè) Channel(信道)。
- 聲明一個(gè) Exchange(交換器),只會(huì)創(chuàng)建一次。
- 聲明兩個(gè) Queue,只會(huì)創(chuàng)建一次。
- 把 Queue 綁定到 Exchange(交換器)上.
- 向指定的 Exchange(交換器)發(fā)送一條消息.
因?yàn)榛?Exchage Topic 模式,在上面發(fā)出的兩條消息當(dāng)中,消息a只會(huì)被a.#匹配到,而a.b.c會(huì)被兩個(gè)都匹配到。所以,最終的結(jié)果會(huì)是隊(duì)列 A 中有一條消息,隊(duì)列 B 中有兩條消息。
從隊(duì)列取出消息代碼:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.queue_declare(queue='A')def callback(ch, method, properties, body):print bodychannel.basic_consume(callback, queue='A', no_ack=True) channel.start_consuming()服務(wù)消費(fèi)者取出消息,需要重新創(chuàng)建 Connection(連接)和 Exchange(交換器),但 Queue 并不會(huì)創(chuàng)建,只需要從 Channel 中獲取對(duì)應(yīng)的 Queue 消息即可。
通過(guò)實(shí)例理解 RabbitMQ 基本概念
上面實(shí)例服務(wù)部署的情況是:三臺(tái)管理服務(wù)器(config-server-git/config-server-svn)和一臺(tái)工作服務(wù)器(config-client-git/config-server-svn),因?yàn)樽隽思?#xff0c;服務(wù)的具體情況:
- config-server-git:3 個(gè)服務(wù)。
- config-server-svn:3 個(gè)服務(wù)。
- config-client-git:1 個(gè)服務(wù)。
- config-client-svn:1 個(gè)服務(wù)。
所以,總的部署服務(wù)有 8 個(gè)。
我們通過(guò) RabbitMQ Server 管理界面中的內(nèi)容,說(shuō)下 Connection(連接)、Channel(信道)、Exchange(交換器)和 Queue(隊(duì)列)的具體使用情況(根據(jù)數(shù)量理解)。
1. Connection(連接)
為什么 Connection(連接)數(shù)量為 16 個(gè)?因?yàn)椴渴鸬?8 個(gè)服務(wù),各自發(fā)布和接受消息(即作為小心發(fā)布者,也作為消息接受者),計(jì)算公式:16 = 8 * 2。
2. Channel(信道)
為什么 Channel(信道)數(shù)量為 16 個(gè)?因?yàn)?Connection(連接)數(shù)量為 16 個(gè),Channel(信道)是在 Connection(連接)基礎(chǔ)上創(chuàng)建的。
3. Exchange(交換器)
為什么 Exchange(交換器)數(shù)量為 1 個(gè)?因?yàn)槎际鞘褂玫耐粋€(gè) Exchange(交換器),名字為springCloudBus,Exchange Type 為topic,Routing Key 為#。
4. Queue(隊(duì)列)
為什么 Queue(隊(duì)列)數(shù)量為 8 個(gè)?因?yàn)椴渴鸬?8 個(gè)服務(wù),各自發(fā)布和接受的 Queue 是同一個(gè),一個(gè)服務(wù)對(duì)應(yīng)一個(gè) Queue。
參考資料:
- RabbitMQ 消息隊(duì)列 - RabbitMQ 消息隊(duì)列架構(gòu)與基本概念(推薦)
- RabbitMQ 使用參考(推薦)
- RabbitMq(一)走進(jìn) RabbitMq
- RabbitMQ 基本概念和使用(推薦)
- 分布式消息隊(duì)列 RabbitMQ 之一:基本概念理解
- RabbitMQ 基礎(chǔ)概念詳細(xì)介紹
- RabbitMQ 基本概念
總結(jié)
以上是生活随笔為你收集整理的通过实例理解 RabbitMQ 的基本概念的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python-psutil
- 下一篇: 基于 Docker 的微服务架构实践