RabbiqMQ快速入门
RabbitMQ
官網(wǎng)地址: https://www.rabbitmq.com/
一個遵循AMQP協(xié)議,開源面向消息的中間件,支持多種編程語言。
Rabbitmq 能做什么?
- 邏輯解耦,異步的消息任務
- 消息持久化,重啟不影響
- 削峰,大規(guī)模的消息處理
主要的特點
可靠性:持久化,傳輸確認,發(fā)布確認 可擴展性:多個節(jié)點可以組成一個集群,可動態(tài)更改 多語言:支持多數(shù)編程語言 管理界面:有常見的用戶界面,便于管理和監(jiān)控常見的應用場景:
并發(fā)請求的壓力高可用設計(電商秒殺場景), 異步任務處理結(jié)果的回調(diào)設計(日志訂單異步處理), 系統(tǒng)集成與分布式系統(tǒng)設計(各種子系統(tǒng)的消息同步)。工作原理
簡單介紹生產(chǎn)者和消費者會和服務器建立tcp鏈接,在tcp鏈接之上會建立多個信道channel,通過信道來發(fā)送消息,生產(chǎn)者生產(chǎn)消息后不直接直接發(fā)到隊列中,而是發(fā)到一個交換空間:Exchange, Exchange會根據(jù)Exchange類型和Routing Key來決定發(fā)到哪個隊列中,消費者在從隊列中拿到消息具體工作模式
名詞解釋
ExChange :消息交換機,決定消息按照什么規(guī)則路由到那個對列中去 Queue :消息載體,每個消息都會被投到一個或多個隊列 Binding:綁定,把exchange 和 queue按照路由規(guī)則綁定起來 Routing Key: 路由關(guān)鍵字,exchage根據(jù)這關(guān)鍵字來投遞消息 Channel :消息通道,客戶端的每個連接建立多個channel Producer :消息生產(chǎn)者,用戶投遞消息的程序 Consumer :消息消費者,用于就是接收消息的程序Exchage工作模式
Fanout: 類似廣播,轉(zhuǎn)發(fā)到所有綁定交換機的Queue Direct: 類似單播,RoutingKey 和 BindingKey完全匹配 Topic : 類似組播,轉(zhuǎn)發(fā)到符合通配符的Queue headers:請求頭與消息頭匹配,才能接收到消息環(huán)境配置
通過docker環(huán)境配置
# /www/rabbitmq目錄可自定義,主要用于目錄掛載 mkdir -p /www/rabbitmq # 創(chuàng)建容器 docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /www/rabbitmq:/var/lib/rabbitmq rabbitmq:management # 查看容器狀態(tài) docker ps | grep rabbit瀏覽器打開登錄rabbitmq, 入口:http://localhost:15672 默認用戶名: guest 密碼: guestgolang實戰(zhàn)
簡單基本玩法
//下載類庫 go get "github.com/streadway/amqp"前期準備代碼
//連接信息 const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"//rabbitMQ結(jié)構(gòu)體 type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.Channel//隊列名稱QueueName string//交換機名稱Exchange string//bind Key 名稱Key string//連接信息Mqurl string }//創(chuàng)建結(jié)構(gòu)體實例 func NewRabbitMQ(queueName string,exchange string ,key string) *RabbitMQ {return &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL} }//斷開channel 和 connection func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close() } //錯誤處理函數(shù) func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))} }簡單模式
簡單模式下 Exchange 和 key是為空的,不需要設置
//創(chuàng)建簡單模式下RabbitMQ實例 func NewRabbitMQSimple(queueName string) *RabbitMQ {//創(chuàng)建RabbitMQ實例rabbitmq := NewRabbitMQ(queueName,"","")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connect rabb"+"itmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//簡單模式隊列生產(chǎn) func (r *RabbitMQ) PublishSimple(message string) {//1.申請隊列,如果隊列不存在會自動創(chuàng)建,存在則跳過創(chuàng)建_, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//調(diào)用channel 發(fā)送消息到隊列中r.channel.Publish(r.Exchange,r.QueueName,//如果為true,根據(jù)自身exchange類型和routekey規(guī)則無法找到符合條件的隊列會把消息返還給發(fā)送者false,//如果為true,當exchange發(fā)送消息到隊列后發(fā)現(xiàn)隊列上沒有消費者,則會把消息返還給發(fā)送者false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//simple 模式下消費者 func (r *RabbitMQ) ConsumeSimple() {//1.申請隊列,如果隊列不存在會自動創(chuàng)建,存在則跳過創(chuàng)建q, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err :=r.channel.Consume(q.Name, // queue//用來區(qū)分多個消費者"", // consumer//是否自動應答true, // auto-ack//是否獨有false, // exclusive//設置為true,表示 不能將同一個Conenction中生產(chǎn)者發(fā)送的消息傳遞給這個Connection中 的消費者false, // no-local//列是否阻塞false, // no-waitnil, // args)if err != nil {fmt.Println(err)}forever := make(chan bool)//啟用協(xié)程處理消息go func() {for d := range msgs {//消息邏輯處理,可以自行設計邏輯log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}工作模式
一個消息只能被一個消費者獲取(場景:生產(chǎn)消息大于消費消息的時候),更簡單模式代碼一樣,只是同事開啟了多個消費端,起到負載均衡的作用
訂閱模式
該模式下,隊列為空,key為空;只需設置交換空間即可;消息被投遞到多個隊列中,一個消息被多個消費者消費
//訂閱模式創(chuàng)建RabbitMQ實例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//創(chuàng)建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,"")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//訂閱模式生產(chǎn) func (r *RabbitMQ) PublishPub(message string) {//1.嘗試創(chuàng)建交換機err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發(fā)送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//訂閱模式消費端代碼 func (r *RabbitMQ) RecieveSub() {//1.試探性創(chuàng)建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"fanout",true,false,//YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創(chuàng)建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產(chǎn)隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,這里的key要為空"",r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }路由模式
在路由模式下,一個消息可以被多個消費者獲取,該模式生產(chǎn)端可以指定消費端;交換機的類型需要設置為direct,并且需要設置bind key。
/路由模式 //創(chuàng)建RabbitMQ實例 func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {//創(chuàng)建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//路由模式發(fā)送消息 func (r *RabbitMQ) PublishRouting(message string ) {//1.嘗試創(chuàng)建交換機err := r.channel.ExchangeDeclare(r.Exchange,//要改成direct"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發(fā)送消息err = r.channel.Publish(r.Exchange,//要設置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) } //路由模式接受消息 func (r *RabbitMQ) RecieveRouting() {//1.試探性創(chuàng)建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創(chuàng)建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產(chǎn)隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//需要綁定keyr.Key,r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }Topic模式,話題模式
一個消息可以被多個消費者獲取,消息的目標queue可用BindingKey以通配符,的方式指定。
交換的類型設置為 topic,在接受端通過匹配規(guī)則匹配(例如:hello.*.world)
參考: https://www.cnblogs.com/luotianshuai/p/7469365.html#4199652
轉(zhuǎn)載于:https://www.cnblogs.com/nirao/p/11176137.html
總結(jié)
以上是生活随笔為你收集整理的RabbiqMQ快速入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL中字符串函数详细介绍
- 下一篇: pthread vs openMP之我见