日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)

發布時間:2023/11/28 生活经验 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1. 安裝 rabbitmq 的 golang 包

golang 可使用庫 github.com/streadway/amqp 操作 rabbitmq 。使用下面命令安裝 RabbitMQ

go get -v github.com/streadway/amqp

2. 生產者流程

Golang 中創建 rabbitmq 生產者基本步驟是:

  1. 連接 Connection
  2. 創建 Channel
  3. 創建或連接一個交換器
  4. 創建或連接一個隊列
  5. 交換器綁定隊列
  6. 投遞消息
  7. 關閉 Channel
  8. 關閉 Connection

2.1 創建連接

// connection
connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

2.2 創建通道

// channel
channel, err := connection.Channel()

2.3 創建交換器

err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil)

參數依次說明:

  • name 交換機名稱
  • kind 交換機類型
  • durable 持久化標識
  • autoDelete 是否自動刪除
  • internal 是否是內置交換機
  • noWait 是否等待服務器確認
  • args 其它配置

參數說明要點:

  • autoDelete :

自動刪除功能必須要在交換器曾經綁定過隊列或者交換器的情況下,處于不再使用的時候才會自動刪除,如果是剛剛創建的尚未綁定隊列或者交換器的交換器或者早已創建只是未進行隊列或者交換器綁定的交換器是不會自動刪除的。

  • internal :

內置交換器是一種特殊的交換器,這種交換器不能直接接收生產者發送的消息,只能作為類似于隊列的方式綁定到另一個交換器,來接收這個交換器中路由的消息,內置交換器同樣可以綁定隊列和路由消息,只是其接收消息的來源與普通交換器不同。

  • noWait

noWaittrue 時,聲明時無需等待服務器的確認。

該通道可能由于錯誤而關閉。 添加一個 NotifyClose 偵聽器應對任何異常。創建交換器還有一個差不多的方法( ExchangeDeclarePassive ),他主要是假定交換已存在,并嘗試連接到不存在的交換將導致 RabbitMQ 引發異常,可用于檢測交換器的存在。

2.4 創建隊列

q, err := channel.QueueDeclare("q1", true, false, false, true, nil)

參數說明:

  • name 隊列名稱
  • durable 持久化
  • autoDelete 自動刪除
  • exclusive 排他
  • noWait 是否等待服務器確認
  • args Table

參數說明要點:

  • exclusive 排他

排他隊列只對首次創建它的連接可見,排他隊列是基于連接( Connection )可見的,并且該連接內的所有信道( Channel)都可以訪問這個排他隊列,在這個連接斷開之后,該隊列自動刪除,由此可見這個隊列可以說是綁到連接上的,對同一服務器的其他連接不可見。

同一連接中不允許建立同名的排他隊列的這種排他優先于持久化,即使設置了隊列持久化,在連接斷開后,該隊列也會自動刪除。

非排他隊列不依附于連接而存在,同一服務器上的多個連接都可以訪問這個隊列。

  • autoDelete 設置是否自動刪除。

true 則設置隊列為自動刪除。

自動刪除的前提是:至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。

不能把這個參數錯誤地理解為:“當連接到此隊列的所有客戶端斷開時,這個隊列自動刪除”,因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊列連接時,都不會自動刪除這個隊列。

創建隊列還有一個差不多的方法( QueueDeclarePassive ),他主要是假定隊列已存在,并嘗試連接到不存在的隊列將導致 RabbitMQ 引發異常,可用于檢測隊列的存在。

2.5 綁定交換器和隊列

err = channel.QueueBind("q1", "q1Key", "e1", true, nil)

參數解析:

  • name 隊列名稱
  • key BindingKey 根據交換機類型來設定
  • exchange 交換機名稱
  • noWait 是否等待服務器確認
  • args Table

2.6 綁定交換器(可選)

err = channel.ExchangeBind("dest", "q1Key", "src", false, nil)

參數解析:

  • destination 目的交換器
  • key RoutingKey 路由鍵
  • source 源交換器
  • noWait 是否等待服務器確認
  • args Table  其它參數

生產者發送消息至交換器 source 中,交換器 source 根據路由鍵找到與其匹配的另一個交換器 destination ,井把消息轉發到 destination 中,進而存儲在 destination 綁定的隊列 queue 中,某種程度上來說 destination 交換器可以看作一個隊列。如圖:

2.7 投遞消息

err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{Timestamp:   time.Now(),DeliveryMode: amqp.Persistent, //Msg set as persistentContentType: "text/plain",Body:        []byte("Hello Golang and AMQP(Rabbitmq)!"),
})

參數解析:

  • exchange 交換器名稱
  • key RouterKey
  • mandatory 是否為無法路由的消息進行返回處理
  • immediate 是否對路由到無消費者隊列的消息進行返回處理 RabbitMQ 3.0 廢棄
  • msg 消息體

參數說明要點:

  • mandatory

消息發布的時候設置消息的 mandatory 屬性用于設置消息在發送到交換器之后無法路由到隊列的情況對消息的處理方式,設置為 true 表示將消息返回到生產者,否則直接丟棄消息。

  • immediate

參數告訴服務器至少將該消息路由到一個隊列中,否則將消息返回給生產者。 imrnediate 參數告訴服務器,如果該消息關聯的隊列上有消費者,則立刻投遞:如果所有匹配的隊列上都沒有消費者,則直接將消息返還給生產者,不用將消息存入隊列而等待消費者了。

RabbitMQ 3.0版本開始去掉了對 imrnediate 參數的支持。

其中 amqp.PublishingDeliveryMode 如果設為 amqp.Persistent 則消息會持久化。需要注意的是如果需要消息持久化 Queue 也是需要設定為持久化才有效。

3. 消費者流程

消費者的步驟和生產者流程基本類似,只是將生產者流程中的投遞消息變為消費消息。

Rabbitmq 消費方式共有 2 種,分別是推模式和拉模式。

3.1 推模式

推模式是通過持續訂閱的方式來消費信息, Consume 將信道( Channel )設置為接收模式,直到取消隊列的訂閱為止。在接收模式期間, RabbitMQ 會不斷地推送消息給消費者。推送消息的個數還是會受到 channel.Qos 的限制。

deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil)

參數說明:

  • queue 隊列名稱
  • consumer 消息者名稱
  • autoAck 是否確認消費
  • exclusive 排他
  • noLocal
  • noWait bool
  • args Table

參數說明要點:

  • noLocal

設置為 true 則表示不能將同一個 Connection 中生產者發送的消息傳送給這個 Connection 中的消費者

其中 autoAck 可以設置為 true 或者 false

  • 如果設為 true 則消費者一接收到就從 queue 中去除了,如果消費者處理消息中發生意外該消息就丟失了。
  • 如果設為 false 則消費者在處理完消息后,調用 msg.Ack(false) 后消息才從 queue 中去除。即便當前消費者處理該消息發生意外,只要沒有執行 msg.Ack(false) 那該消息就仍然在 queue 中,不會丟失。

如果autoAck設置為 false 則表示需要手動進行 ack 消費

v, ok := <-deliveries
if ok {// 手動ack確認// 注意: 這里只要調用了ack就是手動確認模式,// v.Ack的參數 multiple 表示的是在此channel中先前所有未確認的deliveries都將被確認// 并不是表示設置為false就不進行當前ack確認if err := v.Ack(true); err != nil {fmt.Println(err.Error())}
} else {fmt.Println("Channel close")
}

3.2 拉模式

相對來說比較簡單,是由消費者主動拉取信息來消費,每次只消費一條消息,同樣也需要進行 ack 確認消費。

channel.Get(queue string, autoAck bool)

參考:
https://studygolang.com/articles/25406
https://studygolang.com/articles/24699?fr=sidebar
https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/go

總結

以上是生活随笔為你收集整理的Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。