日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问

發(fā)布時(shí)間:2023/12/10 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

作者:mousycoder

segmentfault.com/a/1190000021054802

消息隊(duì)列連環(huán)炮

  • 項(xiàng)目里怎么樣使用 MQ 的?

  • 為什么要使用消息隊(duì)列?

  • 消息隊(duì)列有什么優(yōu)點(diǎn)和缺點(diǎn)?

  • kafka,activemq,rabbitmq,rocketmq 都有什么去唄?

  • 如何保證消息隊(duì)列高可用?

  • 如何保證消息不被重復(fù)消費(fèi)?

  • 如何保證消息的可靠性傳輸?

  • 如何保證消息的順序性?

  • 寫一個(gè)消息隊(duì)列架構(gòu)設(shè)計(jì)?

  • 消息隊(duì)列技術(shù)選型

    解決的問(wèn)題:

    • 解耦

    • 異步

    • 削峰

    不用 MQ 系統(tǒng)耦合場(chǎng)景

    A 系統(tǒng)產(chǎn)生了一個(gè)比較關(guān)鍵的數(shù)據(jù),很多系統(tǒng)需要 A 系統(tǒng)將數(shù)據(jù)發(fā)過(guò)來(lái),強(qiáng)耦合(B,C,D,E 系統(tǒng)可能參數(shù)不一樣、一會(huì)需要一會(huì)不需要數(shù)據(jù),A 系統(tǒng)要不斷修改代碼維護(hù))

    A 系統(tǒng)還要考慮 B、C、D、E 系統(tǒng)是否掛了,是否訪問(wèn)超時(shí)?是否重試?

    使用 MQ 系統(tǒng)解耦場(chǎng)景

  • 維護(hù)這個(gè)代碼,不需要考慮人家是否調(diào)用成功,失敗超時(shí)

  • 如果新系統(tǒng)需要數(shù)據(jù),直接從 MQ 里消費(fèi)即可,如果某個(gè)系統(tǒng)不需要這條數(shù)據(jù)就取消對(duì) MQ 消息的消費(fèi)即可。

  • 總結(jié):通過(guò)一個(gè) MQ 的發(fā)布訂閱消息模型(Pub/Sub), 系統(tǒng) A 跟其他系統(tǒng)就徹底解耦了。

    不用 MQ 同步高延遲請(qǐng)求場(chǎng)景

    一般互聯(lián)網(wǎng)類的企業(yè),對(duì)用戶的直接操作,一般要求每個(gè)請(qǐng)求都必須在 200ms以內(nèi),對(duì)用戶幾乎是無(wú)感知的。

    使用 MQ 進(jìn)行異步化之后的接口性能優(yōu)化

    提高高延時(shí)接口

    沒(méi)有用 MQ 時(shí)高峰期系統(tǒng)被打死的場(chǎng)景

    高峰期每秒 5000 個(gè)請(qǐng)求,每秒對(duì) MySQL 執(zhí)行 5000 條 SQL(一般MySQL每秒 2000 個(gè)請(qǐng)求差不多了),如果MySQL被打死,然后整個(gè)系統(tǒng)就崩潰,用戶就沒(méi)辦法使用系統(tǒng)了。但是高峰期過(guò)了之后,每秒鐘可能就 50 個(gè)請(qǐng)求,對(duì)整個(gè)系統(tǒng)沒(méi)有任何壓力。

    使用 MQ 進(jìn)行削峰的場(chǎng)景

    5000 個(gè)請(qǐng)求寫入到 MQ 里面,系統(tǒng) A 每秒鐘最多只能處理 2000 個(gè)請(qǐng)求(MySQL 每秒鐘最多處理 2000 個(gè)請(qǐng)求),系統(tǒng) A 從 MQ 里慢慢拉取請(qǐng)求,每秒鐘拉取 2000 個(gè)請(qǐng)求。MQ,每秒鐘 5000 個(gè)請(qǐng)求進(jìn)來(lái),結(jié)果只有 2000 個(gè)請(qǐng)求出去,結(jié)果導(dǎo)致在高峰期(21小時(shí)),可能有幾十萬(wàn)甚至幾百萬(wàn)的請(qǐng)求積壓在 MQ 中,這個(gè)是正常的,因?yàn)檫^(guò)了高峰期之后,每秒鐘就 50 個(gè)請(qǐng)求,但是系統(tǒng) A 還是會(huì)按照每秒 2000 個(gè)該請(qǐng)求的速度去處理。只要高峰期一過(guò),系統(tǒng) A 就會(huì)快速的將積壓的消息給解決掉。

    算一筆賬,每秒積壓在 MQ 里消息有 3000 條,一分鐘就會(huì)積壓 18W 條消息,一個(gè)小時(shí)就會(huì)積壓 1000 萬(wàn)條消息。等高峰期一過(guò),差不多需要 1 個(gè)多小時(shí)就可以把 1000W 條積壓的消息給處理掉

    架構(gòu)中引入 MQ 后存在的問(wèn)題

    • 系統(tǒng)可用性降低

    MQ 可能掛掉,導(dǎo)致整個(gè)系統(tǒng)崩潰

    • 系統(tǒng)復(fù)雜性變高

    可能發(fā)重復(fù)消息,導(dǎo)致插入重復(fù)數(shù)據(jù);消息丟了;消息順序亂了;系統(tǒng) B,C,D 掛了,導(dǎo)致 MQ 消息積累,磁盤滿了;

    • 一致性問(wèn)題

    本來(lái)應(yīng)該A,B,C,D 都執(zhí)行成功了再返回,結(jié)果A,B,C 執(zhí)行成功 D 失敗

    Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么優(yōu)缺點(diǎn)

    建議:中小型公司 RabbitMQ 大公司:RocketMQ 大數(shù)據(jù)實(shí)時(shí)計(jì)算:Kafka

    消息隊(duì)列高可用

    RabbtitMQ 高可用

    RabbitMQ有三種模式:單機(jī)模式 、普通集群模式、鏡像集群模式

    • 單機(jī)模式

    demo級(jí)

    • 普通集群模式(非高可用)

    隊(duì)列的元數(shù)據(jù)存在于多個(gè)實(shí)例中,但是消息不存在多個(gè)實(shí)例中,每次多臺(tái)機(jī)器上啟動(dòng)多個(gè) rabbitmq 實(shí)例,每個(gè)機(jī)器啟動(dòng)一個(gè)。

    • 優(yōu)點(diǎn):可以多個(gè)機(jī)器消費(fèi)消息,可以提高消費(fèi)的吞吐量

    • 缺點(diǎn):可能會(huì)在 rabbitmq 內(nèi)部產(chǎn)生大量的數(shù)據(jù)傳輸 ;可用性基本沒(méi)保障,queue 所在機(jī)器宕機(jī),就沒(méi)辦法消費(fèi)了

    沒(méi)有高可用性可言

    • 鏡像集群模式(高可用,非分布式)

    隊(duì)列的元數(shù)據(jù)和消息都會(huì)存在于多個(gè)實(shí)例中,每次寫消息到 queue的時(shí)候,都會(huì)自動(dòng)把消息到多個(gè)實(shí)例的 queue 里進(jìn)行消息同步。也就 是每個(gè)節(jié)點(diǎn)上都有這個(gè) queue 的一個(gè)完整鏡像(這個(gè) queue的全部數(shù)據(jù))。任何一個(gè)節(jié)點(diǎn)宕機(jī)了,其他節(jié)點(diǎn)還包含這個(gè) queue的完整數(shù)據(jù),其他 consumer 都可以到其他活著的節(jié)點(diǎn)上去消費(fèi)數(shù)據(jù)都是 OK 的。

    缺點(diǎn):不是分布式的,如果這個(gè) queue的數(shù)據(jù)量很大,大到這個(gè)機(jī)器上的容量無(wú)法容納 。

    開(kāi)啟鏡像集群模式方法:管理控制臺(tái),Admin頁(yè)面下,新增一個(gè)鏡像集群模式的策略,指定的時(shí)候可以要求數(shù)據(jù)同步到所有節(jié)點(diǎn),也可以要求同步到指定數(shù)量的節(jié)點(diǎn),然后你再次創(chuàng)建 queue 的時(shí)候 ,應(yīng)用這個(gè)策略,就 會(huì)自動(dòng)將數(shù)據(jù)同步到其他的節(jié)點(diǎn)上去。

    • Kafka 高可用架構(gòu)

    broker進(jìn)程就是kafka在每臺(tái)機(jī)器上啟動(dòng)的自己的一個(gè)進(jìn)程。每臺(tái)機(jī)器+機(jī)器上的broker進(jìn)程,就可以認(rèn)為是 kafka集群中的一個(gè)節(jié)點(diǎn)。

    你創(chuàng)建一個(gè) topic,這個(gè)topic可以劃分為多個(gè) partition,每個(gè) partition 可以存在于不同的 broker 上,每個(gè) partition就存放一部分?jǐn)?shù)據(jù)。

    這就是天然的分布式消息隊(duì)列,也就是說(shuō)一個(gè) topic的數(shù)據(jù),是分散放在 多個(gè)機(jī)器上的,每個(gè)機(jī)器就放一部分?jǐn)?shù)據(jù)。

    分布式的真正含義是每個(gè)節(jié)點(diǎn)只放一部分?jǐn)?shù)據(jù),而不是完整數(shù)據(jù)(完整數(shù)據(jù)就是HA、集群機(jī)制)
    Kafka 0.8版本之前是沒(méi)有 HA 機(jī)制的,任何一個(gè) broker 宕機(jī)了,那么就缺失一部分?jǐn)?shù)據(jù)。

    Kafka 0.8以后,提供了 HA 機(jī)制,就是 replica 副本機(jī)制。

    每個(gè) partition的數(shù)據(jù)都會(huì)同步到其他機(jī)器上,形成自己的多個(gè) replica 副本。然后所有 replica 會(huì)選舉一個(gè) leader。那么生產(chǎn)者、消費(fèi)者都會(huì)和這個(gè) leader 打交道,然后其他 replica 就是 follow。寫的時(shí)候,leader 負(fù)責(zé)把數(shù)據(jù)同步到所有 follower上去,讀的時(shí)候就直接讀 leader 上的數(shù)據(jù)即可。

    如果某個(gè) broker宕機(jī)了,剛好也是 partition的leader,那么此時(shí)會(huì)選舉一個(gè)新的 leader出來(lái),大家繼續(xù)讀寫那個(gè)新的 leader即可,這個(gè)就 是所謂的高可用性。

    leader和follower的同步機(jī)制:

    寫數(shù)據(jù)的時(shí)候,生產(chǎn)者就寫 leader,然后 leader將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動(dòng)從 leader來(lái)pull數(shù)據(jù)。一旦所有 follower同步好數(shù)據(jù)了,就會(huì)發(fā)送 ack給 leader,leader收到所有 follower的 ack之后,就會(huì)返回寫成功的消息給生產(chǎn)者。

    消費(fèi)的時(shí)候,只會(huì)從 leader去讀,但是只有一個(gè)消息已經(jīng)被所有 follower都同步成功返回 ack的時(shí)候,這個(gè)消息才會(huì)被消費(fèi)者讀到。

    消息隊(duì)列重復(fù)數(shù)據(jù)

    MQ 只能保證消息不丟,不能保證重復(fù)發(fā)送

    Kafka 消費(fèi)端可能出現(xiàn)的重復(fù)消費(fèi)問(wèn)題

    每條消息都有一個(gè) offset 代表 了這個(gè)消息的順序的序號(hào),按照數(shù)據(jù)進(jìn)入 kafka的順序,kafka會(huì)給每條數(shù)據(jù)分配一個(gè) offset,代表了這個(gè)是數(shù)據(jù)的序號(hào),消費(fèi)者從 kafka去消費(fèi)的時(shí)候,按照這個(gè)順序去消費(fèi),消費(fèi)者會(huì)去提交 offset,就是告訴 kafka已經(jīng)消費(fèi)到 offset=153這條數(shù)據(jù)了 ;zk里面就記錄了消費(fèi)者當(dāng)前消費(fèi)到了 offset =幾的那條消息;假如此時(shí)消費(fèi)者系統(tǒng)被重啟,重啟之后,消費(fèi)者會(huì)找kafka,讓kafka把上次我消費(fèi)到的那個(gè)地方后面的數(shù)據(jù)繼續(xù)給我傳遞過(guò)來(lái)。

    重復(fù)消息原因:(主要發(fā)生在消費(fèi)者重啟后)

    消費(fèi)者不是說(shuō)消費(fèi)完一條數(shù)據(jù)就立馬提交 offset的,而是定時(shí)定期提交一次 offset。消費(fèi)者如果再準(zhǔn)備提交 offset,但是還沒(méi)提交 offset的時(shí)候,消費(fèi)者進(jìn)程重啟了,那么此時(shí)已經(jīng)消費(fèi)過(guò)的消息的 offset并沒(méi)有提交,kafka也就不知道你已經(jīng)消費(fèi)了 offset= 153那條數(shù)據(jù),這個(gè)時(shí)候kafka會(huì)給你發(fā)offset=152,153,154的數(shù)據(jù),此時(shí) offset = 152,153的消息重復(fù)消費(fèi)了

    保證 MQ 重復(fù)消費(fèi)冪等性

    冪等:一個(gè)數(shù)據(jù)或者一個(gè)請(qǐng)求,給你重復(fù)來(lái)多次,你得確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò)。
    思路:

    • 拿數(shù)據(jù)要寫庫(kù),首先檢查下主鍵,如果有數(shù)據(jù),則不插入,進(jìn)行一次update

    • 如果是寫 redis,就沒(méi)問(wèn)題,反正每次都是 set ,天然冪等性

    • 生產(chǎn)者發(fā)送消息的時(shí)候帶上一個(gè)全局唯一的id,消費(fèi)者拿到消息后,先根據(jù)這個(gè)id去 redis里查一下,之前有沒(méi)消費(fèi)過(guò),沒(méi)有消費(fèi)過(guò)就處理,并且寫入這個(gè) id 到 redis,如果消費(fèi)過(guò)了,則不處理。

    • 基于數(shù)據(jù)庫(kù)的唯一鍵

    保證 MQ 消息不丟

    MQ 傳遞非常核心的消息,比如:廣告計(jì)費(fèi)系統(tǒng),用戶點(diǎn)擊一次廣告,扣費(fèi)一塊錢,如果扣費(fèi)的時(shí)候消息丟了,則會(huì)不斷少錢,積少成多,對(duì)公司是一個(gè)很大的損失。

    RabbitMQ可能存在的數(shù)據(jù)丟失問(wèn)題

    • 生產(chǎn)者寫消息的過(guò)程中,消息都沒(méi)有到 rabbitmq,在網(wǎng)絡(luò)傳輸過(guò)程中就丟了。或者消息到了 rabbitmq,但是人家內(nèi)部出錯(cuò)了沒(méi)保存下來(lái)。

    • RabbitMQ 接收到消息之后先暫存在主機(jī)的內(nèi)存里,結(jié)果消費(fèi)者還沒(méi)來(lái)得及消費(fèi),RabbitMQ自己掛掉了,就導(dǎo)致暫存在內(nèi)存里的數(shù)據(jù)給搞丟了。

    • 消費(fèi)者消費(fèi)到了這個(gè)消費(fèi),但是還沒(méi)來(lái)得及處理,自己就掛掉了,RabbitMQ 以為這個(gè)消費(fèi)者已經(jīng)處理完了。

    問(wèn)題 1解決方案:

    事務(wù)機(jī)制:(一般不采用,同步的,生產(chǎn)者發(fā)送消息會(huì)同步阻塞卡住等待你是成功還是失敗。會(huì)導(dǎo)致生產(chǎn)者發(fā)送消息的吞吐量降下來(lái))

    ????channel.txSelect
    try?{
    ????//發(fā)送消息
    }?catch(Exception?e){
    ????channel.txRollback;
    ????//再次重試發(fā)送這條消息
    }?
    ????channel.txCommit;

    confirm機(jī)制:(一般采用這種機(jī)制,異步的模式,不會(huì)阻塞,吞吐量會(huì)比較高)

    • 先把 channel 設(shè)置成 confirm 模式

    • 發(fā)送一個(gè)消息到 rabbitmq

    • 發(fā)送完消息后就不用管了

    • rabbitmq 如果接收到了這條消息,就會(huì)回調(diào)你生產(chǎn)者本地的一個(gè)接口,通知你說(shuō)這條消息我已經(jīng)收到了

    • rabbitmq 如果在接收消息的時(shí)候報(bào)錯(cuò)了,就會(huì)回調(diào)你的接口,告訴你這個(gè)消息接收失敗了,你可以再次重發(fā)。

    public?void?ack(String?messageId){

    }

    public?void?nack(String?messageId){
    ????//再次重發(fā)一次這個(gè)消息
    }

    問(wèn)題 2 解決方案:

    持久化到磁盤

    • 創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證 rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里的數(shù)據(jù)

    • 發(fā)送消息的時(shí)候?qū)?deliveryMode 設(shè)置為 2,將消息設(shè)置為持久化的,此時(shí) rabbitmq就會(huì)將消息持久化到磁盤上去。必須同時(shí)設(shè)置 2 個(gè)持久化才行。

    • 持久化可以跟生產(chǎn)者那邊的 confirm機(jī)制配合起來(lái),只有消息被持久化到磁盤之后,才會(huì)通知生產(chǎn)者 ack了 ,所以哪怕是在持久化到磁盤之前 ,rabbitmq掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到 ack,你也可以自己重發(fā)。

    缺點(diǎn):可能會(huì)有一點(diǎn)點(diǎn)丟失數(shù)據(jù)的可能,消息剛好寫到了 rabbitmq中,但是還沒(méi)來(lái)得及持久化到磁盤上,結(jié)果不巧, rabbitmq掛了,會(huì)導(dǎo)致內(nèi)存里的一點(diǎn)點(diǎn)數(shù)據(jù)會(huì)丟失。

    問(wèn)題 3 解決方案:

    原因:消費(fèi)者打開(kāi)了 autoAck機(jī)制(消費(fèi)到一條消息,還在處理中,還沒(méi)處理完,此時(shí)消費(fèi)者自動(dòng) autoAck了,通知 rabbitmq說(shuō)這條消息已經(jīng)消費(fèi)了,此時(shí)不巧,消費(fèi)者系統(tǒng)宕機(jī)了,那條消息丟失了,還沒(méi)處理完,而且 rabbitmq還以為這個(gè)消息已經(jīng)處理掉了)

    解決方案:關(guān)閉 autoAck,自己處理完了一條消息后,再發(fā)送 ack給 rabbitmq,如果此時(shí)還沒(méi)處理完就宕機(jī)了,此時(shí)rabbitmq沒(méi)收到你發(fā)的ack消息,然后 rabbitmq 就會(huì)將這條消息重新分配給其他的消費(fèi)者去處理。

    Kafka 可能存在的數(shù)據(jù)丟失問(wèn)題

    消費(fèi)端弄丟數(shù)據(jù)

    原因:消費(fèi)者消費(fèi)到那條消息后,自動(dòng)提交了 offset,kafka以為你已經(jīng)消費(fèi)好了這條消息,結(jié)果消費(fèi)者掛了,這條消息就丟了。

    例子:消費(fèi)者消費(fèi)到數(shù)據(jù)后寫到一個(gè)內(nèi)存 queue里緩存下,消息自動(dòng)提交 offset,重啟了系統(tǒng),結(jié)果會(huì)導(dǎo)致內(nèi)存 queue 里還沒(méi)來(lái)得及處理的數(shù)據(jù)丟失。

    解決方法:kafka會(huì)自動(dòng)提交 offset,那么只要關(guān)閉自動(dòng)提交 offset,在處理完之后自己手動(dòng)提交,可以保證數(shù)據(jù)不會(huì)丟。但是此時(shí)確實(shí)還是會(huì)重復(fù)消費(fèi),比如剛好處理完,還沒(méi)提交 offset,結(jié)果自己掛了,此時(shí)肯定會(huì)重復(fù)消費(fèi)一次 ,做好冪等即可。

    Kafka 丟掉消息

    原因:kafka 某個(gè) broker 宕機(jī),然后重新選舉 partition 的 leader時(shí),此時(shí)其他的 follower 剛好還有一些數(shù)據(jù)沒(méi)有同步,結(jié)果此時(shí) leader掛了,然后選舉某個(gè) follower成 leader之后,就丟掉了之前l(fā)eader里未同步的數(shù)據(jù)。

    例子:kafka的leader機(jī)器宕機(jī),將 follower 切換為 leader之后,發(fā)現(xiàn)數(shù)據(jù)丟了
    解決方案:(保證 kafka broker端在 leader發(fā)生故障,或者leader切換時(shí),數(shù)據(jù)不會(huì)丟)

    • 給 topic設(shè)置 replication.factor ,這個(gè)值必須大于 1,保證每個(gè) partition 必須至少有 2 個(gè)副本

    • 在 kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù),這個(gè)值必須大于 1,這個(gè)是要求一個(gè)leader至少感知到有至少一個(gè)follower還跟自己保持聯(lián)系,沒(méi)掉隊(duì),這樣才能確保 leader掛了還有一個(gè)follower,保證至少一個(gè) follower能和leader保持正常的數(shù)據(jù)同步。

    • 在 producer 端設(shè)置 acks =all,這個(gè)是要求每條數(shù)據(jù),必須是寫入所有 replica 之后,才能認(rèn)為是寫成功了。否則會(huì)生產(chǎn)者會(huì)一直重試,此時(shí)設(shè)置 retries = MAX(很大的重試的值),要求一旦寫入失敗,就卡在這里(避免消息丟失)

    • kafka 生產(chǎn)者丟消息

    按 2 的方案設(shè)置了 ack =all,一定不會(huì)丟。它會(huì)要求 leader 接收到消息,所有的 follower 都同步 到了消息之后,才認(rèn)為本次寫成功。如果沒(méi)滿足這個(gè)條件,生產(chǎn)者會(huì)無(wú)限次重試 。

    消息隊(duì)列順序性

    背景:mysql binlog 同步的系統(tǒng),在mysql里增刪改一條數(shù)據(jù),對(duì)應(yīng)出來(lái)了增刪改 3 條binlog,接著這 3 條binlog發(fā)送到 MQ 里面,到消費(fèi)出來(lái)依次執(zhí)行,起碼是要保證順序的吧,不然順序變成了 刪除、修改、增加。日同步數(shù)據(jù)達(dá)到上億,mysql->mysql,比如大數(shù)據(jù) team,需要同步一個(gè)mysql庫(kù),來(lái)對(duì)公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復(fù)雜的操作。

    場(chǎng)景:

    • rabbitmq,一個(gè)queue,多個(gè)consumer,這不明顯亂了

    • kafka,一個(gè)topic,一個(gè)partition,一個(gè)consumer,內(nèi)部多線程,這不也亂了

    RabbitMQ 消息順序錯(cuò)亂

    RabbitMQ 如何保證消息順序性

    需要保證順序的數(shù)據(jù)放到同一個(gè)queue里

    Kafka 消息順序錯(cuò)亂

    寫入一個(gè) partition中的數(shù)據(jù)一定是有順序的。

    生產(chǎn)者在寫的時(shí)候,可以指定一個(gè) key,比如訂單id作為key,那么訂單相關(guān)的數(shù)據(jù),一定會(huì)被分發(fā)到一個(gè) partition中區(qū),此時(shí)這個(gè) partition中的數(shù)據(jù)一定是有順序的。Kafka 中一個(gè) partition 只能被一個(gè)消費(fèi)者消費(fèi)。消費(fèi)者從partition中取出數(shù)據(jù)的時(shí)候 ,一定是有順序的。

    Kafka 保證消息順序性

    如果消費(fèi)者單線程消費(fèi)+處理,如果處理比較耗時(shí),處理一條消息是幾十ms,一秒鐘只能處理幾十條數(shù)據(jù),這個(gè)吞吐量太低了。肯定要用多線程去并發(fā)處理,壓測(cè)消費(fèi)者4 核 8G 單機(jī),32 條線程,最高每秒可以處理上千條消息

    消息隊(duì)列延遲以及過(guò)期失效

    消費(fèi)端出了問(wèn)題,不消費(fèi)了或者消費(fèi)極其慢。接著坑爹了,你的消息隊(duì)列集群的磁盤都快寫滿了 ,都沒(méi)人消費(fèi),怎么辦?積壓了幾個(gè)小時(shí),rabbitmq設(shè)置了消息過(guò)期時(shí)間后就沒(méi)了,怎么辦?

    例如:

    • 每次消費(fèi)之后都要寫 mysql,結(jié)果mysql掛了,消費(fèi)端 hang 不動(dòng)了。

    • 消費(fèi)者本地依賴的一個(gè)東西掛了,導(dǎo)致消費(fèi)者掛了。

    • 長(zhǎng)時(shí)間沒(méi)處理消費(fèi),導(dǎo)致 mq 寫滿了。

    場(chǎng)景:幾千萬(wàn)條數(shù)據(jù)再 MQ 里積壓了七八個(gè)小時(shí)

    快速處理積壓的消息

    一個(gè)消費(fèi)者一秒是 1000 條,一秒 3 個(gè)消費(fèi)者是 3000 條,一分鐘是 18W 條,1000 多 W 條需要一個(gè)小時(shí)恢復(fù)。

    步驟:

    • 先修復(fù) consumer 的問(wèn)題,確保其恢復(fù)消費(fèi)速度,然后將現(xiàn)有的 consumer 都停掉

    • 新建一個(gè)topic,partition是原來(lái)的 10 倍,臨時(shí)建立好原先 10 倍或者 20 倍的 queue 數(shù)量

    • 然后寫一個(gè)臨時(shí)的分發(fā)數(shù)據(jù)的 consumer 程序,這個(gè)程序部署上去消費(fèi)積壓的數(shù)據(jù),消費(fèi)之后不做耗時(shí)的處理,直接均勻輪詢寫入臨時(shí)建立好的 10 倍數(shù)量的 queue

    • 接著臨時(shí)征用 10 倍的機(jī)器來(lái)部署 consumer,每一批 consumer 消費(fèi)一個(gè)臨時(shí) queue 的數(shù)據(jù)

    • 這種做法相當(dāng) 于是臨時(shí)將 queue 資源和 consumer 資源擴(kuò)大 10 倍,以正常 10 倍速度

    • 等快速消費(fèi)完積壓數(shù)據(jù)之后,恢復(fù)原先部署架構(gòu) ,重新用原先的 consumer機(jī)器消費(fèi)消息

    原來(lái) 3 個(gè)消費(fèi)者需要 1 個(gè)小時(shí)可以搞定,現(xiàn)在 30 個(gè)臨時(shí)消費(fèi)者需要 10 分鐘就可以搞定。

    如果用的 rabbitmq,并且設(shè)置了過(guò)期時(shí)間,如果此消費(fèi)在 queue里積壓超過(guò)一定的時(shí)間會(huì)被 rabbitmq清理掉,數(shù)據(jù)直接搞丟。
    這個(gè)時(shí)候開(kāi)始寫程序,將丟失的那批 數(shù)據(jù)查出來(lái),然后重新灌入mq里面,把白天丟的數(shù)據(jù)補(bǔ)回來(lái)。

    如果消息積壓mq,長(zhǎng)時(shí)間沒(méi)被處理掉,導(dǎo)致mq快寫完滿了,你臨時(shí)寫一個(gè)程序,接入數(shù)據(jù)來(lái)消費(fèi),寫到一個(gè)臨時(shí)的mq里,再讓其他消費(fèi)者慢慢消費(fèi) 或者消費(fèi)一個(gè)丟棄一個(gè),都不要了,快速消費(fèi)掉所有的消息,然后晚上補(bǔ)數(shù)據(jù)。

    如何設(shè)計(jì)消息隊(duì)列中間件架構(gòu)

    • mq要支持可伸縮性,快速擴(kuò)容。設(shè)計(jì)一個(gè)分布式的 MQ,broker->topic->partition,每個(gè) partition 放一個(gè)機(jī)器,就存一部分?jǐn)?shù)據(jù)。如果現(xiàn)在資源不夠,給 topic 增加 partition ,然后做數(shù)據(jù)遷移,增加機(jī)器。

    • mq數(shù)據(jù)落磁盤,避免進(jìn)程掛了數(shù)據(jù)丟了,順序?qū)?#xff0c;這樣就沒(méi)有磁盤隨機(jī)讀寫的尋址開(kāi)銷,磁盤順序讀寫的性能是很高的,這個(gè)就是 kafka的思路。

    • mq高可用性。多副本->leader & follower-> broker 掛了重新選舉 leader 對(duì)外提供服務(wù)

    • 支持?jǐn)?shù)據(jù) 0 丟失。

    【面試題專欄】

    2020年Java多線程與并發(fā)系列22道高頻面試題解析

    從阿里、騰訊的面試真題中總結(jié)了這11個(gè)Redis高頻面試題

    2020年Java基礎(chǔ)高頻面試題匯總(1.4W字詳細(xì)解析)

    全網(wǎng)最全Spring系列面試題129道(附答案解析)

    2萬(wàn)字Java并發(fā)編程面試題整理(含答案,建議收藏)

    85道Java微服務(wù)面試題整理(助力2020面試)

    【2020年大廠面試必備】JVM與性能調(diào)優(yōu)知識(shí)點(diǎn)整理

    2019年面試官最喜歡問(wèn)的28道ZooKeeper面試題

    2020面試還搞不懂MyBatis?看看這27道面試題!(含答案和思維導(dǎo)圖)

    Spring Cloud+Spring Boot高頻面試題解析

    2019年常見(jiàn)的Linux面試題及答案解析,哪些你還不會(huì)?

    2019年常見(jiàn)Elasticsearch面試題答案解析

    18道kafka高頻面試題哪些你還不會(huì)?(含答案和思維導(dǎo)圖)

    2019年12道RabbitMQ高頻面試題你都會(huì)了嗎?(含答案解析)

    2019年Dubbo你掌握的如何?快看看這30道高頻面試題!

    ?你在看嗎?

    總結(jié)

    以上是生活随笔為你收集整理的rabbitmq 拉取消息太慢_面试官:消息队列这些我都要问的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。