日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbiqMQ快速入门

發布時間:2023/12/2 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbiqMQ快速入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RabbitMQ

官網地址: https://www.rabbitmq.com/

一個遵循AMQP協議,開源面向消息的中間件,支持多種編程語言。

Rabbitmq 能做什么?

  • 邏輯解耦,異步的消息任務
  • 消息持久化,重啟不影響
  • 削峰,大規模的消息處理

主要的特點

可靠性:持久化,傳輸確認,發布確認 可擴展性:多個節點可以組成一個集群,可動態更改 多語言:支持多數編程語言 管理界面:有常見的用戶界面,便于管理和監控

常見的應用場景:

并發請求的壓力高可用設計(電商秒殺場景), 異步任務處理結果的回調設計(日志訂單異步處理), 系統集成與分布式系統設計(各種子系統的消息同步)。

工作原理

簡單介紹生產者和消費者會和服務器建立tcp鏈接,在tcp鏈接之上會建立多個信道channel,通過信道來發送消息,生產者生產消息后不直接直接發到隊列中,而是發到一個交換空間:Exchange, Exchange會根據Exchange類型和Routing Key來決定發到哪個隊列中,消費者在從隊列中拿到消息

具體工作模式

名詞解釋

ExChange :消息交換機,決定消息按照什么規則路由到那個對列中去 Queue :消息載體,每個消息都會被投到一個或多個隊列 Binding:綁定,把exchange 和 queue按照路由規則綁定起來 Routing Key: 路由關鍵字,exchage根據這關鍵字來投遞消息 Channel :消息通道,客戶端的每個連接建立多個channel Producer :消息生產者,用戶投遞消息的程序 Consumer :消息消費者,用于就是接收消息的程序

Exchage工作模式

Fanout: 類似廣播,轉發到所有綁定交換機的Queue Direct: 類似單播,RoutingKey 和 BindingKey完全匹配 Topic : 類似組播,轉發到符合通配符的Queue headers:請求頭與消息頭匹配,才能接收到消息

環境配置

通過docker環境配置

# /www/rabbitmq目錄可自定義,主要用于目錄掛載 mkdir -p /www/rabbitmq # 創建容器 docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /www/rabbitmq:/var/lib/rabbitmq rabbitmq:management # 查看容器狀態 docker ps | grep rabbit瀏覽器打開登錄rabbitmq, 入口:http://localhost:15672 默認用戶名: guest 密碼: guest

golang實戰

簡單基本玩法

//下載類庫 go get "github.com/streadway/amqp"

前期準備代碼

//連接信息 const MQURL = "amqp://imoocuser:imoocuser@127.0.0.1:5672/imooc"//rabbitMQ結構體 type RabbitMQ struct {conn *amqp.Connectionchannel *amqp.Channel//隊列名稱QueueName string//交換機名稱Exchange string//bind Key 名稱Key string//連接信息Mqurl string }//創建結構體實例 func NewRabbitMQ(queueName string,exchange string ,key string) *RabbitMQ {return &RabbitMQ{QueueName:queueName,Exchange:exchange,Key:key,Mqurl:MQURL} }//斷開channel 和 connection func (r *RabbitMQ) Destory() {r.channel.Close()r.conn.Close() } //錯誤處理函數 func (r *RabbitMQ) failOnErr(err error, message string) {if err != nil {log.Fatalf("%s:%s", message, err)panic(fmt.Sprintf("%s:%s", message, err))} }

簡單模式

簡單模式下 Exchange 和 key是為空的,不需要設置

//創建簡單模式下RabbitMQ實例 func NewRabbitMQSimple(queueName string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ(queueName,"","")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err, "failed to connect rabb"+"itmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//簡單模式隊列生產 func (r *RabbitMQ) PublishSimple(message string) {//1.申請隊列,如果隊列不存在會自動創建,存在則跳過創建_, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//調用channel 發送消息到隊列中r.channel.Publish(r.Exchange,r.QueueName,//如果為true,根據自身exchange類型和routekey規則無法找到符合條件的隊列會把消息返還給發送者false,//如果為true,當exchange發送消息到隊列后發現隊列上沒有消費者,則會把消息返還給發送者false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//simple 模式下消費者 func (r *RabbitMQ) ConsumeSimple() {//1.申請隊列,如果隊列不存在會自動創建,存在則跳過創建q, err := r.channel.QueueDeclare(r.QueueName,//是否持久化false,//是否自動刪除false,//是否具有排他性false,//是否阻塞處理false,//額外的屬性nil,)if err != nil {fmt.Println(err)}//接收消息msgs, err :=r.channel.Consume(q.Name, // queue//用來區分多個消費者"", // consumer//是否自動應答true, // auto-ack//是否獨有false, // exclusive//設置為true,表示 不能將同一個Conenction中生產者發送的消息傳遞給這個Connection中 的消費者false, // no-local//列是否阻塞false, // no-waitnil, // args)if err != nil {fmt.Println(err)}forever := make(chan bool)//啟用協程處理消息go func() {for d := range msgs {//消息邏輯處理,可以自行設計邏輯log.Printf("Received a message: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever}

工作模式

一個消息只能被一個消費者獲取(場景:生產消息大于消費消息的時候),更簡單模式代碼一樣,只是同事開啟了多個消費端,起到負載均衡的作用

訂閱模式

該模式下,隊列為空,key為空;只需設置交換空間即可;消息被投遞到多個隊列中,一個消息被多個消費者消費

//訂閱模式創建RabbitMQ實例 func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,"")var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//訂閱模式生產 func (r *RabbitMQ) PublishPub(message string) {//1.嘗試創建交換機err := r.channel.ExchangeDeclare(r.Exchange,"fanout",true,false,//true表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發送消息err = r.channel.Publish(r.Exchange,"",false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) }//訂閱模式消費端代碼 func (r *RabbitMQ) RecieveSub() {//1.試探性創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"fanout",true,false,//YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,這里的key要為空"",r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }

路由模式

在路由模式下,一個消息可以被多個消費者獲取,該模式生產端可以指定消費端;交換機的類型需要設置為direct,并且需要設置bind key。

/路由模式 //創建RabbitMQ實例 func NewRabbitMQRouting(exchangeName string,routingKey string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq }//路由模式發送消息 func (r *RabbitMQ) PublishRouting(message string ) {//1.嘗試創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//要改成direct"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發送消息err = r.channel.Publish(r.Exchange,//要設置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) } //路由模式接受消息 func (r *RabbitMQ) RecieveRouting() {//1.試探性創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"direct",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//需要綁定keyr.Key,r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }

Topic模式,話題模式

一個消息可以被多個消費者獲取,消息的目標queue可用BindingKey以通配符,的方式指定。
交換的類型設置為 topic,在接受端通過匹配規則匹配(例如:hello.*.world)

//話題模式 //創建RabbitMQ實例 func NewRabbitMQTopic(exchangeName string,routingKey string) *RabbitMQ {//創建RabbitMQ實例rabbitmq := NewRabbitMQ("",exchangeName,routingKey)var err error//獲取connectionrabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)rabbitmq.failOnErr(err,"failed to connect rabbitmq!")//獲取channelrabbitmq.channel, err = rabbitmq.conn.Channel()rabbitmq.failOnErr(err, "failed to open a channel")return rabbitmq } //話題模式發送消息 func (r *RabbitMQ) PublishTopic(message string ) {//1.嘗試創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//要改成topic"topic",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an excha"+"nge")//2.發送消息err = r.channel.Publish(r.Exchange,//要設置r.Key,false,false,amqp.Publishing{ContentType: "text/plain",Body: []byte(message),}) } //話題模式接受消息 //要注意key,規則 //其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個) //匹配 imooc.* 表示匹配 imooc.hello, 但是imooc.hello.one需要用imooc.#才能匹配到 func (r *RabbitMQ) RecieveTopic() {//1.試探性創建交換機err := r.channel.ExchangeDeclare(r.Exchange,//交換機類型"topic",true,false,false,false,nil,)r.failOnErr(err, "Failed to declare an exch"+"ange")//2.試探性創建隊列,這里注意隊列名稱不要寫q, err := r.channel.QueueDeclare("", //隨機生產隊列名稱false,false,true,false,nil,)r.failOnErr(err, "Failed to declare a queue")//綁定隊列到 exchange 中err = r.channel.QueueBind(q.Name,//在pub/sub模式下,這里的key要為空r.Key,r.Exchange,false,nil)//消費消息messges, err := r.channel.Consume(q.Name,"",true,false,false,false,nil,)forever := make(chan bool)go func() {for d := range messges {log.Printf("Received a message: %s", d.Body)}}()fmt.Println("退出請按 CTRL+C\n")<-forever }

參考: https://www.cnblogs.com/luotianshuai/p/7469365.html#4199652

轉載于:https://www.cnblogs.com/nirao/p/11176137.html

總結

以上是生活随笔為你收集整理的RabbiqMQ快速入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。