RabbiqMQ快速入门
生活随笔
收集整理的這篇文章主要介紹了
RabbiqMQ快速入门
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
RabbitMQ
官網地址: https://www.rabbitmq.com/
一個遵循AMQP協議,開源面向消息的中間件,支持多種編程語言。
Rabbitmq 能做什么?
- 邏輯解耦,異步的消息任務
- 消息持久化,重啟不影響
- 削峰,大規模的消息處理
主要的特點
可靠性:持久化,傳輸確認,發布確認 可擴展性:多個節點可以組成一個集群,可動態更改 多語言:支持多數編程語言 管理界面:有常見的用戶界面,便于管理和監控常見的應用場景:
并發請求的壓力高可用設計(電商秒殺場景), 異步任務處理結果的回調設計(日志訂單異步處理), 系統集成與分布式系統設計(各種子系統的消息同步)。工作原理
簡單介紹生產者和消費者會和服務器建立tcp鏈接,在tcp鏈接之上會建立多個信道channel,通過信道來發送消息,生產者生產消息后不直接直接發到隊列中,而是發到一個交換空間:Exchange, Exchange會根據Exchange類型和Routing Key來決定發到哪個隊列中,消費者在從隊列中拿到消息具體工作模式
名詞解釋
ExChange :消息交換機,決定消息按照什么規則路由到那個對列中去 Queue :消息載體,每個消息都會被投到一個或多個隊列 Binding:綁定,把exchange 和 queue按照路由規則綁定起來 Routing Key: 路由關鍵字,exchage根據這關鍵字來投遞消息 Channel :消息通道,客戶端的每個連接建立多個channel Producer :消息生產者,用戶投遞消息的程序 Consumer :消息消費者,用于就是接收消息的程序Exchage工作模式
Fanout: 類似廣播,轉發到所有綁定交換機的Queue Direct: 類似單播,RoutingKey 和 BindingKey完全匹配 Topic : 類似組播,轉發到符合通配符的Queue headers:請求頭與消息頭匹配,才能接收到消息環境配置
通過docker環境配置
# /www/rabbitmq目錄可自定義,主要用于目錄掛載 mkdir -p /www/rabbitmq # 創建容器 docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /www/rabbitmq:/var/lib/rabbitmq rabbitmq:management # 查看容器狀態 docker ps | grep rabbit瀏覽器打開登錄rabbitmq, 入口:http://localhost:15672 默認用戶名: guest 密碼: guestgolang實戰
簡單基本玩法
//下載類庫 go get "github.com/streadway/amqp"前期準備代碼
//連接信息 const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"//rabbitMQ結構體 type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.Channel//隊列名稱QueueName string//交換機名稱Exchange string//bind Key 名稱Key string//連接信息Mqurl string }//創建結構體實例 func NewRabbitMQ(queueName string,exchange string ,key string) *RabbitMQ {return &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL} }//斷開channel 和 connection func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close() } //錯誤處理函數 func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))} }簡單模式
簡單模式下 Exchange 和 key是為空的,不需要設置
//創建簡單模式下RabbitMQ實例 func NewRabbitMQSimple(queueName string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ(queueName,"","")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connect rabb"+"itmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//簡單模式隊列生產 func (r *RabbitMQ) PublishSimple(message string) {//1.申請隊列,如果隊列不存在會自動創建,存在則跳過創建_, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//調用channel 發送消息到隊列中r.channel.Publish(r.Exchange,r.QueueName,//如果為true,根據自身exchange類型和routekey規則無法找到符合條件的隊列會把消息返還給發送者false,//如果為true,當exchange發送消息到隊列后發現隊列上沒有消費者,則會把消息返還給發送者false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//simple 模式下消費者 func (r *RabbitMQ) ConsumeSimple() {//1.申請隊列,如果隊列不存在會自動創建,存在則跳過創建q, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err :=r.channel.Consume(q.Name, // queue//用來區分多個消費者"", // consumer//是否自動應答true, // auto-ack//是否獨有false, // exclusive//設置為true,表示 不能將同一個Conenction中生產者發送的消息傳遞給這個Connection中 的消費者false, // no-local//列是否阻塞false, // no-waitnil, // args)if err != nil {fmt.Println(err)}forever := make(chan bool)//啟用協程處理消息go func() {for d := range msgs {//消息邏輯處理,可以自行設計邏輯log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}工作模式
一個消息只能被一個消費者獲取(場景:生產消息大于消費消息的時候),更簡單模式代碼一樣,只是同事開啟了多個消費端,起到負載均衡的作用
訂閱模式
該模式下,隊列為空,key為空;只需設置交換空間即可;消息被投遞到多個隊列中,一個消息被多個消費者消費
//訂閱模式創建RabbitMQ實例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,"")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//訂閱模式生產 func (r *RabbitMQ) PublishPub(message string) {//1.嘗試創建交換機err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//訂閱模式消費端代碼 func (r *RabbitMQ) RecieveSub() {//1.試探性創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"fanout",true,false,//YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,這里的key要為空"",r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }路由模式
在路由模式下,一個消息可以被多個消費者獲取,該模式生產端可以指定消費端;交換機的類型需要設置為direct,并且需要設置bind key。
/路由模式 //創建RabbitMQ實例 func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//路由模式發送消息 func (r *RabbitMQ) PublishRouting(message string ) {//1.嘗試創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//要改成direct"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發送消息err = r.channel.Publish(r.Exchange,//要設置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) } //路由模式接受消息 func (r *RabbitMQ) RecieveRouting() {//1.試探性創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//需要綁定keyr.Key,r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }Topic模式,話題模式
一個消息可以被多個消費者獲取,消息的目標queue可用BindingKey以通配符,的方式指定。
交換的類型設置為 topic,在接受端通過匹配規則匹配(例如:hello.*.world)
參考: https://www.cnblogs.com/luotianshuai/p/7469365.html#4199652
轉載于:https://www.cnblogs.com/nirao/p/11176137.html
總結
以上是生活随笔為你收集整理的RabbiqMQ快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MySQL中字符串函数详细介绍
- 下一篇: pthread vs openMP之我见