日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Ruby使用RabbitMQ(基础)

發布時間:2023/12/8 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Ruby使用RabbitMQ(基础) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Ruby使用RabbitMQ(基礎)

RabbitMQ documentation

rabbitmq-tutorials

rabbitmq-configure

bunny

前提

最近剛剛接觸到mq, 就在極客時間上買了一門關于mq的課程.
學習了一些基礎加上在 RabbitMQ 官網上的例子.

總結了一下.

為什么要使用mq

消息隊列(mq)可以幫助我們去處理系統之間的消息傳遞.
幫助我們去解決消息在傳遞過程中可能出現的數據丟失問題.

同時,消息隊列還可以起到緩存的作用.
消息隊列可以暫存一些消息. 平衡消息上下游之間速度不平衡.

mq是必須的嗎

mq 不是必須的, 很多系統沒有mq 也可以正常運行.
但是, 有mq 可以幫助我們很好地處理系統之間的交互.

mq的應用場景

  • 異步處理
  • 流量控制
  • 日志
  • 服務解耦

不適合mq的場景

  • 數據一致性要求很高
  • 第三方支付
  • 銀行轉賬等

術語 jargon

RabbitMQ 就像一個郵局一樣, 有發送信件的人, 也有接受人.
人們不去關心到底通過什么樣子的方式, 把信交到收信人那里的.
人們只要把信交給郵局就好.

RabbitMQ 在系統間就起到郵局的作用.
幫助系統之間進行通信. 并且要確保信息不丟失.

Producer(生產者)

Producer 就是消息的生產者.

也就是消息的上游, 一個系統A 產生了一些信息.
可能需要另外一個系統B 去處理.
那么, 系統A 就是一個 Producer

Consumer(消費者)

一個系統B, 需要處理另一個系統A 的消息.
那么 系統B就是一個 Consumer

注意: 一個系統可以是Producer,也可以是Consumer.

Broker

Broker 就是mq, 消息隊列中間件.
它去協調在兩個系統之間的消息. 盡可能地讓消息不丟失,不重復.

安裝

  • deepin操作系統
  • 使用docker安裝
# 安裝docker sudo apt-get install docker-ce # 使用docker運行 sudo docker run -it --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management # 關閉 docker stop rabbitmq # 啟動 docker start -a rabbitmq

docker 運行完成后,本地會啟動一個server,
也有一個web端的控制臺

http://localhost:15672/#/

使用bunny

剛剛安裝了RabbitMQ的server.

那么,現在就應該安裝RabbitMQ的client了;
RabbitMQ 支持的語言有很多中.
這里因為工作原因, 選擇使用了Ruby的bunny

安裝 bunny 用于和rabbitmq 進行交互

# 通過gem來安裝 gem install bunny # 直接require 就可以使用了 require 'bunny'

簡單使用

Producer

代碼中 hostname 是我自己在hosts設置的;
如果 Producer 和 Consumer 在同一個主機上.
就不用設置 hostname

# 引入 bunny require 'bunny' # hostname 是我自己在hosts設置的; connection = Bunny.new(hostname: 'dev.local') connection.startchannel = connection.create_channel# 獲取queue是冪等的 queue = channel.queue('simple')100.times do |n|message = "第#{n}條消息"queue.publish(message) end # 關閉連接 connection.close

Consumer

require 'bunny'connection = Bunny.new(hostname: 'dev.local') connection.startchannel = connection.create_channel# 獲取queue是冪等的 queue = channel.queue('simple')# block: true 表明會一直監聽mq, 等待消息的傳入 # 真實的環境不要使用 queue.subscribe(block: true) do |_delivery_info, _properties, body|# 模擬延時任務, 延時1ssleep 1puts body end

消息隊列可以幫助我們處理消息;
無論是先啟動 Consumer 還是先啟動 Producer
消息是不會丟失的;

我們分別先執行Consumer,Producer;
我們收到的信息都是一樣的;

Consumer 可擴展性

當一個Consumer的處理速度已經滿足不了 Producer的生產速度時,
我們可以同時運行多個Consumer.

我們可以執行多個上面 Consumer 的代碼;
我們可以看到消息被mq一個個地均分給了一個個的Consumer.

但是, 注意: 這里需要先啟動多個 Consumer, 再啟動 Producer.
不然, 只能有一個Consumer收到消息.

消息確認

在上面的 Consumer 代碼中, 如果一個Consumer 掛了;
或者在 Consumer處理問題發生了問題;
導致消息沒有正常處理.

那么這條消息就是丟失了;

RabbitMQ 中如何解決這個問題;
在client上加入 manual_ack: true 就可以了;

queue.subscribe(block: true, manual_ack: true) do |_delivery_info, _properties, body|# 模擬延時任務, 延時1ssleep 1puts body# 發送ack, 通知mq 消息已處理完成;# 必須發送ack, 不然mq 會一直requeuechannel.ack(_delivery_info.delivery_tag) end

修改完了, 我們再看一下結果;
就可以發現, 如果一個Consumer掛掉了;

消息會轉移到其它存活的Consumer上.

注意: 如果使用了manual_ack, 一定要發送 ack. 不然RabbitMQ會不停占用內存, 最后導致系統崩了;

消息持久化(Message durability)

通過上面的代碼, 我們解決了當Consumer掛掉的時候, mq 會幫助我們把消息發送給別的Consumer;
確保了, 在Consumer端處理得當;

但是, 我們也要考慮到當 RabbitMQ server 掛了的時候.
我們如何處理呢?

  • 確保 queue 是可持久化的
  • 確保 messages 是可持久化的
  • 我們在代碼中可以添加

    同時在Consumer和Producer的代碼中修改
    注意: 我們使用了不同的queue

    # 加入 durable: true queue = channel.queue('simple-durable', durable: true)

    在Producer 中加入

    queue.publish(message, persistent: true)

    執行docker kill 可以在試試看效果

    不過要注意:

    這樣并不會完全保證消息不丟失; 盡管RabbitMQ知道了要把數據保存到硬盤中.
    但是仍然有一個短暫的窗口時間: 當mq 接受到消息, 但是還未將消息保存到硬盤.

    同樣, MQ也不會收到一條消息就保存到內存中去;
    這個持久化并不strong. 當然,一般情況下夠用.

    如果需要一個更strong的保證. 可以使用 publisher confirms

    公平分配(Fair dispatch)

    我們運行上面的Consumer 可以發現.
    消息其實是公平分配給每個Consumer的.

    那么,這可能會出現問題;
    假如某個節點處理消息的速度快, 但是因為公平分配.
    這個節點的利用率就比較低了;

    為了解決這個問題:
    我們可以使用 channel.prefetch(n)

    n = 1 channel.prefetch(n)

    這將告訴RabbitMQ 不要一次發布超過 n 條消息給Consumer;
    直到Consumer處理完消息, 并返回ack.

    注意: 如果所有的Consumer都處于繁忙狀態, 消息可能會堆積.

    總結消息丟失

    確認消息丟失

    首先, 我們需要確定消息丟失.

    我們可以利用消息隊列的有序性來驗證是否有消息丟失

    一般MQ會有 攔截器機制, 讓我們在Producer和Consumer代碼之前,
    進行消息丟失的驗證;

    圖中的三個系統 Produce, Broker, Consumer

    三個階段 生產階段, 存儲階段, 消費階段

    四個流程 send, pull, ack_to_broker, ack_to_producer

    都有可能因為系統,網絡等原因導致消息的丟失;

    Producer

    如何在生產者這一階段確保消息不丟失呢?

    生產者發送給MQ消息后,
    MQ會給生產者一個ack. 如果沒有接受到ack.
    說明MQ 掛掉了,或者網絡沒能達到等等.

    這時候, 需要生產者這一端進行數據重發.

    只要 Producer 收到了 Broker 的確認響應,
    那說明在生產者這一階段是確保了消息不丟失;

    但是, 這也有可能導致消息重復;

    Broker

    消息傳遞到了MQ這里. 一般情況下,消息可以正常發送給Consumer.
    但是, 如果MQ掛了; 怎么處理?

    我們在之前的代碼中提到了消息的持久化;
    消息的持久化, 可以確保消息在MQ掛了的情況下, 還能夠保存下來;

    等到下次重啟就可以繼續把消息發送給Consumer.

    Consumer

    在Consumer階段. 很有可能在處理消息時, 系統重啟或者掛掉;
    那么, 處理消息的業務邏輯還沒有完成.

    這條消息就不能算是被成功處理了;
    所以需要MQ 再次重發消息;

    不要在收到消息后就立即發送消費確認,而是應該在執行完所有消費業務邏輯之后,再發送消費確認.

    消息重復

    消息在網絡傳輸過程中發送錯誤,由于發送方收不到確認,會通過重發來保證消息不丟失。
    但是,如果確認響應在網絡傳輸時丟失,也會導致重發消息。

    也就是說,無論是 Broker 還是 Consumer 都是有可能收到重復消息的

    質量標準

    傳遞消息時能夠提供的服務質量標準

    • at more once 消息最多發送一次, 不管不顧. 發完就完事了;

    • at least once 至少一次. 消息會有確認機制. 保證消息被消費者消費了; 但是肯定會有重復;

    • exactly once 僅僅一次; 這個MQ沒有實現; 也很難實現; 因為系統層面上的問題, 不能全讓MQ 處理;

    exactly once 的實現需要 消息隊列 + 系統處理

    從業務邏輯設計上入手,將消費的業務邏輯設計成具備冪等性的操作。
    但是, 很多代碼不是冪等的;

    需要我們處理成冪等;

    消費冪等

    消費者一段的代碼, 如果是冪等的;
    那么, 遇到多余的消息. 直接執行邏輯也無妨;

    例如:

    給Tony 性別設置為 男;

    這個邏輯執行1次, 和執行 n次效果一樣;

    所以,如果業務處理邏輯本身就是冪等的,
    那么,就不用考慮消息重復的問題;

    冪等: 多次執行和執行一次結果相同;

    消息查重

    更新的數據設置前置條件

    消息先去重,根據業務ID(或者其它能夠標識消息唯一性的就行),
    去查詢是否消費過此消息了,
    消費了,則拋棄,否則就消費.

    如果大家看完了上面的內容, 可以看看鏈接中, RabbitMQ更為常用的介紹.

    Ruby使用RabbitMQ(進階)

    總結

    以上是生活随笔為你收集整理的Ruby使用RabbitMQ(基础)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。