日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

websphere mq 查看队列中是否有数据_全网最全的 “消息队列”

發(fā)布時間:2023/11/27 生活经验 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 websphere mq 查看队列中是否有数据_全网最全的 “消息队列” 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

消息隊列的使用場景


以下介紹消息隊列在實(shí)際應(yīng)用常用的使用場景。異步處理、應(yīng)用解耦、流量削鋒消息通訊四個場景。

1】異步處理場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信。

引入消息隊列后架構(gòu)如下:用戶的響應(yīng)時間=注冊信息寫入數(shù)據(jù)庫的時間,例如50毫秒。發(fā)注冊郵箱、發(fā)注冊短信寫入消息隊列后,直接返回客戶端,因?qū)懭胂㈥犃械乃俣群芸?#xff0c;基本可以忽略,因此用戶的響應(yīng)時間可能是50毫秒。按照傳統(tǒng)的做法:
? ①、串行方式,將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)注冊郵件,再發(fā)送注冊短信,以上三個成功后,返回客戶端??赡苄枰?50毫秒,這樣使用消息隊列提高了3倍。
? ②、并行方式,將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件,同時發(fā)送注冊短信。也可能需要100毫秒,這樣使用消息隊列提高了2倍。
2】應(yīng)用解耦:場景說明:用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。如下圖:

傳統(tǒng)模式的缺點(diǎn)①、庫存系統(tǒng)無法訪問時,則訂單減庫存業(yè)務(wù)將會失敗,從而導(dǎo)致訂單失敗;②、訂單系統(tǒng)與庫存系統(tǒng)耦合;

引入消息隊列①、用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。②、庫存系統(tǒng):訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統(tǒng)根據(jù)下單信息,進(jìn)行庫存操作。

?? ?當(dāng)庫存系統(tǒng)不能正常使用時,也不會影響正常下單,因?yàn)橄聠魏?#xff0c;訂單系統(tǒng)寫入消息隊列就不再關(guān)心其他的后續(xù)操作了。實(shí)現(xiàn)訂單系統(tǒng)與庫存系統(tǒng)的解耦。
3】流量削鋒:場景說明:秒殺或團(tuán)搶活動中使用廣泛。秒殺活動,一般會因?yàn)榱髁窟^大,導(dǎo)致流量暴增,應(yīng)用掛掉。一般需要在應(yīng)用前端加入消息隊列。用戶請求:服務(wù)器接受后,首先寫入消息隊列。當(dāng)消息隊列長度超出最大數(shù)量,則直接拋棄用戶請求或跳轉(zhuǎn)至錯誤頁面。秒殺業(yè)務(wù)處理:根據(jù)消息隊列中的請求信息,再做后續(xù)處理。
??▁▂▃ 這樣可以有效的控制活動人數(shù)和有效緩解短時間內(nèi)的高流量沖擊,防止壓垮應(yīng)用系統(tǒng)。
4】日志處理:指將消息隊列用在日志處理中,比如 Kafka 的應(yīng)用,解決大量日志傳輸?shù)膯栴}。

? ?? 日志采集客戶端:負(fù)責(zé)日志數(shù)據(jù)采集,定時寫入 Kafka隊列。

? ?? kafka消息隊列:負(fù)責(zé)日志數(shù)據(jù)的接收,存儲和轉(zhuǎn)發(fā)。

? ?? 日志處理應(yīng)用:訂閱并消費(fèi) kafka 隊列中的日志數(shù)據(jù)。

5】消息通信:消息隊列一般都內(nèi)置了高效的通信機(jī)制,因此也可以用純消息通信。比如實(shí)現(xiàn)點(diǎn)對點(diǎn)消息隊列,或者聊天室。

? ①、點(diǎn)對點(diǎn)通訊:客戶端A和客戶端B使用同一隊列,進(jìn)行消息通訊
? ②、聊天室通訊(發(fā)布訂閱模式):客戶端A,客戶端B,客戶端N訂閱同一主題,進(jìn)行消息發(fā)布和接收。實(shí)現(xiàn)類似聊天室效果。

消息中間件的工作流程

?1、發(fā)送端 MQ-Product (消息生產(chǎn)者)將消息發(fā)送給 MQ-server;

?2、MQ-server 將消息落地,持久化到數(shù)據(jù)庫等;

?3、MQ-server 回 ACK 給 MQ-Producer;

?4、MQ-server 將消息發(fā)送給消息接收端 MQ-Consumer (消息消費(fèi)者);

?5、MQ-Consumer 消費(fèi)接收到消息后發(fā)送 ACK 給 MQ-server;

?6、MQ-server 將落地消息刪除;

消息的重發(fā),補(bǔ)發(fā)策略
為了保證消息必達(dá),MQ使用了消息超時、重傳、確認(rèn)機(jī)制。使得消息可能被重復(fù)發(fā)送,當(dāng)消息生產(chǎn)者收不到 MQ-server 的ACK,重復(fù)向 MQ-server發(fā)送消息。MQ-server 收不到消息消費(fèi)者的 ACK,重復(fù)向消息消費(fèi)者發(fā)消息。

消息重發(fā)【1】如果消息接收者在處理消息過程中沒有對MOM(消息中間鍵)進(jìn)行應(yīng)答,則消息將由 MOM重發(fā)。

【2】如果隊列中設(shè)置了預(yù)讀參數(shù)(consumer.perfetchSize),如果消息接收者在處理第一條消息時(沒有向MOM進(jìn)行確認(rèn))就宕機(jī)了,則預(yù)讀數(shù)量的所有消息將被重發(fā)。

【3】如果 Session 是事務(wù)的,則只要消息接收者有一條消息沒有確認(rèn),或消息發(fā)送期間 MOM 或客戶端某一方突然宕機(jī)了,則該事務(wù)范圍中的所有消息 MOM 都將重發(fā)。

?? ActiveMQ 消息服務(wù)器怎么知道客戶端到底是消息正在處理中還是已處理完成沒應(yīng)答MOM或者宕機(jī)等等情況?其實(shí)是所有的客戶端機(jī)器,都運(yùn)行著一套客戶端的 ActiveMQ 環(huán)境,該環(huán)境緩存發(fā)來的消息,維持著和 ActiveMQ服務(wù)器的消息通訊,負(fù)責(zé)失效轉(zhuǎn)移(fail-over)等,所有的判斷和處理都是由這套客戶端環(huán)境來完成的。

補(bǔ)發(fā)策略前提,Broker 根據(jù)自己的規(guī)則,通過 BrokerInfo 命令包和客戶端建立連接,向客戶端傳送缺省發(fā)送策略(發(fā)送:同步和異步,策略:持久化消息和非持久化消息)。但是客戶端可以使用 ActiveMQConnect.getRedeliveryPolicy() 方法覆蓋該策略設(shè)置。

RedeliveryPolicy policy = connection.getRedeliveryPolicy();  policy.setInitialRedeliveryDelay(500);  policy.setBackOffMultiplier(2);  policy.setUseExponentialBackOff(true);  policy.setMaximumRedeliveries(2);

★? 一旦消息重發(fā)嘗試超過重發(fā)策略中配置的 maximumRedeliveries(默認(rèn)=6)會給 Broker 發(fā)送一個“Poison ack”通知它,這個消息被認(rèn)為是 a poison pill,接著 Broker會將這個消息發(fā)送給 DLQ(Dead Letter Queue),以便后續(xù)處理。

策略【1】 缺省死信隊列(Dead Letter Queue)叫做Active.DLQ;所有的未送達(dá)消息將發(fā)送到這個隊列,導(dǎo)致非常難于管理。此時就可以通過設(shè)置 activemq.xml 文件中的 destination policy map 的 “individualDeadLetterStrategy” 屬性來修改。

<broker...>    <destinationPolicy>    <policyMap>      <policyEntries>            <policyEntry queue=">">        <deadLetterStrategy>                <individualDeadLetterStrategy          queuePrefix="DLQ." useQueueForQueueMessages="true" />        deadLetterStrategy>      policyEntry>      policyEntries>    policyMap>    destinationPolicy>    ...  broker>??

【2】自動丟棄過期消息(Expired Messages):一些應(yīng)用可能只是簡單的丟棄過期消息,而不是將它們放到 DLQ。在dead ?letter strategy死信策略上配置 processExpired 屬性為 false,可以實(shí)現(xiàn)這個功能。

<broker...>    <destinationPolicy>     <policyMap>     <policyEntries>              <policyEntry queue=">">              <deadLetterStrategy>         <sharedDeadLetterStrategy processExpired="false" />       deadLetterStrategy>       policyEntry>     policyEntries>     policyMap>    destinationPolicy>  ...  broker>  

【3】將非持久信息(non-persistent messages)放入死信隊列 ActiveMQ 缺省不會將未發(fā)送到的非持久信息放入死信隊列。如果一個應(yīng)用程序并不想將消息 message 設(shè)置為持久的,那么記錄下來的那些未發(fā)送到的消息對它來說往往也就沒有價值。不過如果想實(shí)現(xiàn)這個功能,可以在 dead-letter-strategy 死信策略上設(shè)置 processNonPersistent="true"。

<broker...>    <destinationPolicy>     <policyMap>     <policyEntries>              <policyEntry queue=">">              <deadLetterStrategy>         <sharedDeadLetterStrategy processNonPersistent="true" />       deadLetterStrategy>       policyEntry>     policyEntries>     policyMap>    destinationPolicy>  ...  broker>  

消息重復(fù)發(fā)送產(chǎn)生的后果


對于非冪等性的服務(wù)而言,如果重復(fù)發(fā)送消息就會產(chǎn)生嚴(yán)重的問題。譬如:銀行取錢,上游支付系統(tǒng)負(fù)責(zé)給用戶扣款,下游系統(tǒng)負(fù)責(zé)給用戶發(fā)錢,通過MQ異步通知。不管是上游的ACK丟失,導(dǎo)致 MQ收到重復(fù)的消息,還是下半場 ACK丟失,導(dǎo)致系統(tǒng)收到重復(fù)的出錢通知,都可能出現(xiàn),上游扣了一次錢,下游發(fā)了多次錢。消息隊列的異步操作,通常用于冪等性的服務(wù),非冪等性的服務(wù)時不適用中間件進(jìn)行通信的。更多的是建立長連接 Socket 進(jìn)行通信的?;蛘咄ㄟ^如下方式改造。

MQ內(nèi)部如何做到冪等性的

對于每條消息,MQ內(nèi)部生成一個全局唯一、與業(yè)務(wù)無關(guān)的消息ID:inner-msg-id。當(dāng) MQ-server 接收到消息時,先根據(jù) inner-msg-id 判斷消息是否重復(fù)發(fā)送,再決定是否將消息落地到 DB中。這樣,有了這個 inner-msg-id 作為去重的依據(jù)就能保證一條消息只能一次落地到 DB。

消息消費(fèi)者應(yīng)當(dāng)如何做到冪等性

【1】對于非冪等性業(yè)務(wù)且要求實(shí)現(xiàn)冪等性業(yè)務(wù):生成一個唯一ID標(biāo)記每一條消息,將消息處理成功和去重日志通過事物的形式寫入去重表。
【2】對于非冪等性業(yè)務(wù)可不實(shí)現(xiàn)冪等性的業(yè)務(wù):權(quán)衡去重所花的代價決定是否需要實(shí)現(xiàn)冪等性,如:購物會員卡成功,向用戶發(fā)送通知短信,發(fā)送一次或者多次影響不大。不做冪等性可以省掉寫去重日志的操作。

如何保證消息的有序性



【Active 中有兩種方式保證消息消費(fèi)的順序性】:【1】通過高級特性 consumer 獨(dú)有的消費(fèi)者(exclusive consumer)。如果一個 queue 設(shè)置為 exclusive,broker 會挑選一個 consumer,并且將所有的消息都發(fā)給這個 consumer。如果這個 consumer掛了,broker 會自動挑選另外一個 consumer。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);

【2】利用 Activemq 的高級特性:MessageGroups。Message Groups 特性是一種負(fù)載均衡的機(jī)制。在一個消息被分發(fā)到consumer 之前,broker 首先檢查消息 JMSXGroupID 屬性。如果存在,那么 broker 會檢查是否有某個 consumer 擁有這個message group。如果沒有,那么 broker 會選擇一個 consumer,并將它關(guān)聯(lián)到這個 message group。此后,這個 consumer 會接收這個 message group 的所有消息,直到:
? ①、Consumer 被關(guān)閉。
? ②、Message group 被關(guān)閉,通過發(fā)送一個消息,并設(shè)置這個消息的 JMSXGroupSeq 為 -1。

消費(fèi)者實(shí)際上根據(jù)兩個維度排序了,一個是消費(fèi)者的 Priority,即消費(fèi)者的優(yōu)先級。還有一個是消費(fèi)者的指定的消息組的個數(shù) AssignedGroupCount。這個順序直接影響到下一條消息是誰來接收。

protected boolean assignMessageGroup(Subscription subscription, QueueMessageReference node) throws Exception {  boolean result = true;  // 保持消息組在一起。  String groupId = node.getGroupID();  int sequence = node.getGroupSequence();  if (groupId != null) {    // 先查找該queue存儲的一個groupId,和consumerId的一個map    MessageGroupMap messageGroupOwners = getMessageGroupOwners();    // 如果是該組的第一條消息。則指定該consumer消費(fèi)該消息組    if (sequence == 1) {      assignGroup(subscription, messageGroupOwners, node, groupId);    } else {      // 確保前一個所有者仍然有效,否則就生成新的主人。      ConsumerId groupOwner;      groupOwner = messageGroupOwners.get(groupId);      if (groupOwner == null) {        assignGroup(subscription, messageGroupOwners, node, groupId);      } else {        if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {          // 一個組中的 sequence < 1 表示改組消息已經(jīng)消費(fèi)完了          if (sequence < 0) {            messageGroupOwners.removeGroup(groupId);            subscription.getConsumerInfo().decrementAssignedGroupCount(destination);          }        } else {          // 說明該消費(fèi)者不能消費(fèi)該消息組          result = false;        }      }    }  }  return result;}

RabbitMQ 保證消息隊列的順序性造成順序錯亂的場景:RabbitMQ 中有一個 Queue,多個 Consumer。生產(chǎn)者向 RabbitMQ 里發(fā)送了三條數(shù)據(jù),順序依次是 data1、data2、data3,放入RabbitMQ 的一個內(nèi)存隊列。有三個消費(fèi)者分別從 MQ 中消費(fèi)這三條數(shù)據(jù)中的一條,可能消費(fèi)者2先執(zhí)行完操作,把 data2 存入數(shù)據(jù)庫,然后是 data1、data3。導(dǎo)致順序錯亂。

解決方案RabbitMQ 將上面的一個 Queue 拆分為三個 Queue,每個 Queue 對應(yīng)一個 Consumer,就是多一些 Queue 而已,確實(shí)是麻煩點(diǎn);然后這個 Consumer 內(nèi)部用內(nèi)存隊列做排隊,然后分發(fā)給底層不同的 worker 來處理。如下,將消息放入一個隊列,由一個消費(fèi)者消費(fèi)即可保證順序。

Kafka 保證消息隊列的順序性: 建了一個 Topic,有三個 Partition。生產(chǎn)者在寫的時候,其實(shí)可以指定一個 key,比如說指定了某個訂單 id 作為 key,那么這個訂單相關(guān)的數(shù)據(jù),一定會被分發(fā)到同一個 Partition 中去,而且這個 Partition 中的數(shù)據(jù)一定是有順序的。消費(fèi)者從 Partition 中取出來數(shù)據(jù)的時候,也一定是有順序的。接著,消費(fèi)者里可能會搞多個線程來并發(fā)處理消息。因?yàn)槿绻M(fèi)者用單線程時,處理比較耗時。而多線程并發(fā)處理時,順序可能就亂序。
解決方案①、一個 topic,一個 partition,一個 consumer,內(nèi)部單線程消費(fèi),單線程吞吐量太低,一般不會用這個。
②、寫 N 個內(nèi)存 queue,具有相同 key 的數(shù)據(jù)都到同一個內(nèi)存 queue;然后對于 N 個線程,每個線程分別消費(fèi)一個內(nèi)存 queue 即可,這樣就能保證順序性。

用過哪些MQ,和其他 MQ比較有什么優(yōu)缺點(diǎn)

【1】Kafka 是 LinkedIn 開發(fā)的一個高性能、分布式的消息系統(tǒng),廣泛用于日志收集、流式數(shù)據(jù)處理、在線和離線消息分發(fā)等場景。雖然不是作為傳統(tǒng)的 MQ來設(shè)計,但在大部分情況下,Kafka 也可以代替原有 ActiveMQ 等傳統(tǒng)的消息系統(tǒng)。
【2】Kafka 將消息流按 Topic 組織,保存消息的服務(wù)器稱為 Broker,消費(fèi)者可以訂閱一個或者多個 Topic。為了均衡負(fù)載,一個Topic 的消息又可以劃分到多個分區(qū)(Partition),分區(qū)越多,Kafka 并行能力和吞吐量越高。
【3】Kafka 集群需要 Zookeeper 支持來實(shí)現(xiàn)集群,Kafka 發(fā)行包中已經(jīng)包含了 Zookeeper,部署的時候可以在一臺服務(wù)器上同時啟動一個 Zookeeper Server 和 一個 Kafka Server,也可以使用已有的其他 Zookeeper 集群。
【4】和傳統(tǒng)的 MQ 不同,消費(fèi)者需要自己保留一個 offset,從 Kafka 獲取消息時,只拉取當(dāng)前 offset 以后的消息。Kafka 的scala/java 版的 Client 已經(jīng)實(shí)現(xiàn)了這部分的邏輯,將 offset 保存到 zookeeper 上。每個消費(fèi)者可以選擇一個 id,同樣 id 的消費(fèi)者對于同一條消息只會收到一次。一個 Topic 的消費(fèi)者如果都使用相同的id,就是傳統(tǒng)的 Queue;如果每個消費(fèi)者都使用不同的id,就是傳統(tǒng)的 pub-sub。

如果在 MQ 的場景下,將 Kafka 和 ActiveMQ 相比,Kafka 的優(yōu)點(diǎn)
【1】分布式、高可擴(kuò)展:Kafka 集群可以透明的擴(kuò)展,增加新的服務(wù)器進(jìn)集群。
【2】高性能:Kafka 的性能大大超過傳統(tǒng)的 ActiveMQ、RabbitMQ 等 MQ 實(shí)現(xiàn),尤其是 Kafka 還支持 batch 操作。
【3】容錯:Kafka 每個 Partition 的數(shù)據(jù)都會復(fù)制到幾臺服務(wù)器上。當(dāng)某個 Broker 故障失效時,ZooKeeper 服務(wù)將通知生產(chǎn)者和消費(fèi)者,生產(chǎn)者和消費(fèi)者轉(zhuǎn)而使用其它 Broker。
【4】高吞吐:在一臺普通的服務(wù)器上既可以達(dá)到 10W/s 的吞吐速率。
【5】完全的分布式系統(tǒng):Broker、Producer、Consumer都原生自動支持分布式,自動實(shí)現(xiàn)負(fù)載均衡。
【6】快速持久化:可以在 O(1) 的系統(tǒng)開銷下進(jìn)行消息持久化。
【7】游標(biāo)位置:ActiveMQ 游標(biāo)由 AMQ來管理,無法讀取歷史數(shù)據(jù)。Kafka 客戶端自己管理游標(biāo),可以重讀數(shù)據(jù)。

Kafka 的缺點(diǎn)
【1】重復(fù)消息:Kafka 只保證每個消息至少會送達(dá)一次,雖然幾率很小,但一條消息有可能會被送達(dá)多次。
【2】消息亂序:雖然一個 Partition 內(nèi)部的消息是保證有序的,但是如果一個Topic 有多個Partition,Partition 之間的消息送達(dá)不保證有序。
【3】復(fù)雜性:Kafka 需要 zookeeper 集群的支持,Topic 通常需要人工來創(chuàng)建,部署和維護(hù)較一般消息隊列成本更高。
? MQ 是非線程安全的【Kafka 架構(gòu)】:【1】Producers(生產(chǎn)者)生產(chǎn)者是發(fā)送一個或多個主題 Topic 的發(fā)布者。生產(chǎn)者向 Kafka 代理發(fā)送數(shù)據(jù)。每當(dāng)生產(chǎn)者將消息發(fā)布給代理時,代理只需要將消息附加到最后一個段文件。實(shí)際上,該消息將被附加到分區(qū)。生產(chǎn)者也可以向指定的分區(qū)發(fā)送消息。

【2】Brokers:代理(經(jīng)紀(jì)人)負(fù)責(zé)維護(hù)發(fā)布數(shù)據(jù)的簡單系統(tǒng)。
【3】Topic:主題屬于特定類別的信息流稱為主題。數(shù)組存儲在主題中。Topic 相當(dāng)于 Queue。主題被拆分成分區(qū)。分區(qū)被實(shí)現(xiàn)為具有大小相等的一組分段文件。
【4】Partition(分區(qū))每個 Partition 內(nèi)部消息有序,其中每個消息都有一個 offset 序號。一個 Partition 值對應(yīng)一個 Broker,一個 Broker 可以管理多個 Partition。
【5】Segment:Partition 物理上由多個 Segment組成。每個 Partion 目錄相當(dāng)于一個巨型文件被平均分配到多個大小相等segment 段數(shù)據(jù)文件中。但每個段 segment file消息數(shù)量不一定相等
【6】Partition offset(分區(qū)偏移):每個 Partition 都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到 Partition中。Partition 中的每個消息都有一個連續(xù)的序列號叫做 offset,用于 Partition唯一標(biāo)識一條消息。
【7】Replicas of partition(分區(qū)備份)副本只是一個分區(qū)備份:不讀取和寫入數(shù)據(jù),主要用于防止數(shù)據(jù)丟失。
【8】Kafka Cluster(Kafka 集群)Kafka 有多個代理被稱為 Kafka集群。可以擴(kuò)展 Kafka集群,無需停機(jī)。這些集群用于管理消息數(shù)據(jù)的持久性和復(fù)制。
【9】Consumers(消費(fèi)者)Consumers 從 MQ讀取數(shù)據(jù)。消費(fèi)者訂閱一個或多個主題,并通過從代理中提取數(shù)據(jù)來使用已發(fā)布的消息。Consumer 自己維護(hù)消費(fèi)到哪個 offset。
每個Consumer 都有對應(yīng)的 group【1】group 內(nèi)是 queue 消費(fèi)模型:各個 Consumer 消費(fèi)不同的 Partition,因此一個消息在 group 內(nèi)只消費(fèi)一次。
【2】group 間是 publish-subscribe 消費(fèi)模型:各個 group 各自獨(dú)立消費(fèi),互不影響,因此一個消息被每個 group 消費(fèi)一次。

MQ 系統(tǒng)的數(shù)據(jù)如何保證不丟失


Producer 數(shù)據(jù)丟失的原因【1】使用同步模式的時候,有 3種狀態(tài)保證消息被安全生產(chǎn),當(dāng)配置 ack=1時(只保證寫入Leader成功)的話,如果剛好 Leader partition 掛了,數(shù)據(jù)就會丟失。

ack 機(jī)制:broker 表示發(fā)來的數(shù)據(jù)已確認(rèn)接收無誤,表示數(shù)據(jù)已經(jīng)保存到磁盤。
?0:不等待 broker 返回確認(rèn)消息
?1:等待 topic 中某個 partition leader 保存成功的狀態(tài)反饋
-1/all:等待 topic 中某個 partition 所有副本都保存成功的狀態(tài)反饋

【2】使用異步模式時,當(dāng)緩沖區(qū)滿了,如果配置=0(還沒有收到確認(rèn)的數(shù)據(jù),數(shù)據(jù)就立即被丟棄掉)。
解決辦法只要能避免以上兩種情況就可以保證消息不會被丟失。如下:
【1】當(dāng)同步模式時,確認(rèn)機(jī)制設(shè)置為-1,就是讓消息寫入 Leader 和所有副本。
【2】當(dāng)異步模式時,消息發(fā)出,還沒收到確認(rèn)的時候,緩沖區(qū)也滿了。在配置文件中設(shè)置成不限制阻塞超時的時間,也就是說讓生產(chǎn)者一直阻塞,這樣就能保證數(shù)據(jù)不會丟失。

producer.type = async
request.required.acks=1
queue.buffering.max.ms=5000 #異步發(fā)送的時候 發(fā)送時間間隔 單位是毫秒
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200 #異步發(fā)送 每次批量發(fā)送的條目

Kafka弄丟了數(shù)據(jù)】:Kafka 的某個 Broker宕機(jī)了,然后重新選舉Broker 上的 Partition 的 Leader時。如果此時 Follower還沒來得及同步數(shù)據(jù),Leader就掛了,然后某個 Follower成為了 Leader,他就少了一部分?jǐn)?shù)據(jù)。
解決辦法一般要求設(shè)置 4個參數(shù)來保證消息不丟失:
【1】給 Topic設(shè)置 replication.factor 參數(shù)這個值必須大于1,表示要求每個 Partition必須至少有2個副本。
【2】在 Kafka服務(wù)端設(shè)置 min.isync.replicas參數(shù):這個值必須大于1,表示要求一個 Leader至少感知到有至少一個 Follower在跟自己保持聯(lián)系正常同步數(shù)據(jù),這樣才能保證 Leader掛了之后還有一個 Follower。
【3】在生產(chǎn)者端設(shè)置 acks= -1:要求每條數(shù)據(jù),必須是寫入所有 Replica 副本之后,才能認(rèn)為是寫入成功了。
【4】在生產(chǎn)者端設(shè)置? retries=MAX(很大的一個值,表示無限重試):表示消息一旦寫入事變,就無限重試。
【Consumer 數(shù)據(jù)丟失的原因】:
當(dāng)你消費(fèi)到了這個消息,然后消費(fèi)者那邊自動提交了offset,讓 kafka 以為你已經(jīng)消費(fèi)好了這個消息,其實(shí)你剛準(zhǔn)備處理這個消息,你還沒處理,你自己就掛了,此時這條消息就丟了。
解決辦法:【1Kafka 會自動提交 offset,使用 Kafka高級API,如果將自動提交 offset 改為手動提交(當(dāng)數(shù)據(jù)入庫之后進(jìn)行偏移量的更新),就可以保證數(shù)據(jù)不會丟。但是可能導(dǎo)致重復(fù)消費(fèi),比如你剛處理完,還沒有提交 offset,結(jié)果自己掛了,此時肯定會重復(fù)消費(fèi)一次,自己保證冪等性就好了。

RabbitMQ 如何實(shí)現(xiàn)集群高可用


鏡像模式隊列的數(shù)據(jù)都鏡像了一份到所有的節(jié)點(diǎn)上。這樣任何一個節(jié)點(diǎn)失效,不會影響到整個集群的使用。在實(shí)現(xiàn)上 mirror queue 內(nèi)部有一套選舉算法,會選出一個 master 和若干的 slaver。master 和 slaver 通過相互之間不斷的發(fā)送心跳來檢查是否連接斷開。可以通過指定 net_ticktime 來控制心跳檢查頻率。注意一個單位時間 net_ticktime 實(shí)際上做了4次交互,故當(dāng)超過net_ticktime (± 25%) 秒沒有響應(yīng)的話則認(rèn)為節(jié)點(diǎn)掛掉。另外注意修改 net_ticktime 時需要所有節(jié)點(diǎn)都一致。配置舉例:

{rabbit, [{tcp_listeners, [5672]}]},
{kernel, [{net_ticktime, 120}]}

Consumer任意連接一個節(jié)點(diǎn),若連上的不是 Master,請求會轉(zhuǎn)發(fā)給 Master,為了保證消息的可靠性,Consumer 回復(fù) Ack 給 Master 后,Master 刪除消息并廣播所有的 Slaver 去刪除;
Publisher任意連接一個節(jié)點(diǎn),若連上的不是 Master,則轉(zhuǎn)發(fā)給 Master,由 Master存儲并轉(zhuǎn)發(fā)給其他的 Slaver存儲;
如果 Slaver 掛掉則集群的節(jié)點(diǎn)狀態(tài)沒有任何變化。只要 Client 沒有連到這個節(jié)點(diǎn)上,也不會給 Client 發(fā)送失敗的通知。在檢測到 Slaver 掛掉的期間 Publish 消息會有延遲。如果配置了高可用策略是自動同步,當(dāng) Slaver 起來后,隊列中有大量的消息需要同步,將會整個集群阻塞長時間的不能讀寫直到同步結(jié)束;
RabbitMQ 實(shí)現(xiàn)了一種鏡像隊列(mirrored queue)的算法提供HA創(chuàng)建隊列時可以通過傳入“x-ha-policy”參數(shù)設(shè)置隊列為鏡像隊列,鏡像隊列會存儲在多個 Rabbit MQ 節(jié)點(diǎn)上,并配置成一主多從的結(jié)構(gòu),可以通過“x-ha-policy-params”參數(shù)來具體指定master 節(jié)點(diǎn)和 slave節(jié)點(diǎn)的列表。所有發(fā)送到鏡像隊列上的操作,比如消息的發(fā)送和刪除,都會先在 master節(jié)點(diǎn)上執(zhí)行,再通過一種叫 GM(Guaranteed Multicast)的原子廣播(atomic broadcast)算法同步到各 slave節(jié)點(diǎn)。GM算法通過兩階段的提交,可以保證 master節(jié)點(diǎn)發(fā)送到所有 slave節(jié)點(diǎn)上的消息要么全部執(zhí)行成功,要么全部失敗;通過環(huán)形的消息發(fā)送順序,即 master節(jié)點(diǎn)發(fā)送消息給一個 slave節(jié)點(diǎn),這個 slave節(jié)點(diǎn)依次發(fā)送給下一個 slave節(jié)點(diǎn),最終消息回到 master節(jié)點(diǎn),保證了主從節(jié)點(diǎn)上的負(fù)載差別不大。通過傳入“x-ha-policy”參數(shù)設(shè)置隊列為鏡像隊列(mirrored queue):定義一個policy:以“ha.”開頭的隊列都被鏡像到集群中的所有節(jié)點(diǎn)上:rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'。定義一個policy:以“cinder”開頭的隊列被鏡像到集群中的任意兩個節(jié)點(diǎn)上,并且自動同步:rabbitmqctl set_policy ha-cinder-two "^cinder"或者設(shè)置'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}';
? all:隊列將 mirrored 到所有集群中的節(jié)點(diǎn)中,當(dāng)新節(jié)點(diǎn)添加進(jìn)來時也會 mirrored 到新的節(jié)點(diǎn);
? exactly(需指定count)如果節(jié)點(diǎn)數(shù)小于 count 數(shù),則隊列將 mirrored 到所有的節(jié)點(diǎn)。如果節(jié)點(diǎn)數(shù)大于 count,新的節(jié)點(diǎn)將不再創(chuàng)建隊列的 mirror(即使原來已創(chuàng)建 mirror 的節(jié)點(diǎn)掛掉也不會創(chuàng)建);
? nodes:對指定的節(jié)點(diǎn)進(jìn)行 mirror。如果沒有一個指定的節(jié)點(diǎn)在運(yùn)行中,那么只有 client 連接的那個節(jié)點(diǎn)才會聲明 queue(這里有個遷移策略:假如 queue是在[A,B]上且A為 master,若給定的新的策略為nodes[C,D],那么為了防止數(shù)據(jù)丟失,在遷移中會同時存在[A,C,D]直到C,D已經(jīng)同步好以后,A才會關(guān)閉);

Kafka 吞吐量高的原因


【1】順序讀寫磁盤,充分利用了操作系統(tǒng)的預(yù)讀機(jī)制。
【2】Linux 中使用 sendfile 命令,減少一次數(shù)據(jù)拷貝:
?? ①、把數(shù)據(jù)從硬盤讀取到內(nèi)核中的頁緩存。
?? ②、把數(shù)據(jù)從內(nèi)核中讀取到用戶空間(sendfile 命令跳過此步驟)。
?? ③、把用戶空間的數(shù)據(jù)寫到 socket 緩存區(qū)中。
?? ④、操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)中復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出。
【3】生產(chǎn)者緩存消息批量發(fā)送,消費(fèi)者批量從 broker 獲取消息,減少 IO 次數(shù),充分利用磁盤順序讀寫的性能。
【4】通常情況下 Kafka 的瓶頸不是 CPU或者磁盤,而是網(wǎng)絡(luò)寬帶,所以生產(chǎn)者可以對數(shù)據(jù)進(jìn)行壓縮。

Kafka 和其他消息隊列的區(qū)別


【與 RabbitMQ 的區(qū)別】:? RabbitMQ:用在實(shí)時的對可靠性要求比較高的消息傳遞上。kafka:用于處理活躍的流式數(shù)據(jù),大數(shù)據(jù)量的數(shù)據(jù)處理上。
【1】在架構(gòu)模型方面:RabbitMQ 遵循 AMQP 協(xié)議,RabbitMQ 的 Broker由 Exchange、Binding、Queue 組成,其中 Exchange 和 Binding 組成了消息的路由鍵;Producer 通過連接 Channel 和 Server 進(jìn)行通信,Consumer 從 Queue 獲取消息進(jìn)行消費(fèi)(長連接,queue 有消息會推送到 consumer端,consumer 循環(huán)從輸入流讀取數(shù)據(jù))。rabbitMQ 以 Broker為中心;有消息的確認(rèn)機(jī)制。
? ? kafka 遵從一般的MQ結(jié)構(gòu),Producer,Broker,Consumer,以 Consumer為中心,消費(fèi)信息保存的客戶端 Consumer上,Consumer根據(jù)消費(fèi)的點(diǎn),從 Broker上批量 pull數(shù)據(jù),無消息確認(rèn)機(jī)制。
【2】在吞吐量方面:RabbitMQ在吞吐量方面稍遜于Kafka,他們的出發(fā)點(diǎn)不一樣,RabbitMQ支持對消息的可靠的傳遞,支持事務(wù),不支持批量的操作;基于存儲的可靠性的要求存儲可以采用內(nèi)存或者硬盤。
? ? kafka具有高的吞吐量,內(nèi)部采用消息的批量處理,zero-copy(sendfile 函數(shù)) 機(jī)制,數(shù)據(jù)的存儲和獲取是本地磁盤順序批量操作,具有O(1)的復(fù)雜度,消息處理的效率很高。
?【3】在可用性方面:RabbitMQ 支持 mirror 的 queue,主 queue失效,mirror queue接管。
? ? Kafka 的 Broker支持主備模式。
?【4】在集群負(fù)載均衡方面:RabbitMQ 的負(fù)載均衡需要單獨(dú)的 loadbalancer 進(jìn)行支持。
? ? Kafka 采用 Zookeeper對集群中的 Broker、Consumer進(jìn)行管理,可以注冊 Topic 到Zookeeper上;通過 Zookeeper的協(xié)調(diào)機(jī)制,Producer 保存對應(yīng) Topic的 Broker信息,可以隨機(jī)或者輪詢發(fā)送到 Broker上;并且 Producer可以基于語義指定分片,消息發(fā)送到 Broker的某分片上。

【與 ActiveMQ 的區(qū)別】ActiveMQ 和 Kafka,前者完全實(shí)現(xiàn)了 JMS 的規(guī)范,后者并沒有糾結(jié)于JMS規(guī)范,設(shè)計了另一套吞吐非常高的分布式發(fā)布-訂閱消息系統(tǒng),非常流行。目前歸屬于 Apache 定級項目。它只用文件系統(tǒng)來管理消息的生命周期。接下來我們結(jié)合三個點(diǎn)(消息安全性,服務(wù)器的穩(wěn)定容錯性以及吞吐量)來分別談?wù)勥@兩個消息中間件。
【1】消息的安全性:Kafka 集群中的 Leader 負(fù)責(zé)某一 Topic 的某一 Partition 的消息的讀寫,理論上 Consumer 和 Producer 只與該 Leader 節(jié)點(diǎn)打交道,一個集群里的某一 Broker 即是 Leader 的同時也可以擔(dān)當(dāng)某一 Partition 的 Follower,即 Replica。Kafka 分配 Replica 的算法如下:
(1)將所有 Broker(假設(shè)共n個Broker)和待分配的 Partition排序。
(2)將第i個 Partition分配到第(i mod n)個 Broker上。
(3)將第i個 Partition的第j個 Replica分配到第((i + j) mod n)個 Broker上。
同時,Kafka 與 Replica 既非同步也不是嚴(yán)格意義上的異步。一個典型的 Kafka 發(fā)送-消費(fèi)消息的過程如下:首先 Producer消息發(fā)送給某 Topic 的某 Partition 的 Leader,Leader 先是將消息寫入本地 Log,同時 follower(如果落后過多將會被踢出 Replica列表)從Leader上 pull 消息,并且在未寫入 log 的同時即向 Leader 發(fā)送 ACK 的反饋,所以對于某一條已經(jīng)算作 commit 的消息來講,在某一時刻,其存在于 Leader的 log中,以及 Replica的內(nèi)存中。這可以算作一個危險的情況(聽起來嚇人),因?yàn)槿绻藭r集群掛了這條消息就算丟失了,但結(jié)合 producer的屬性(request.required.acks=-1,當(dāng)所有follower都收到消息后返回ack)可以保證在絕大多數(shù)情況下消息的安全性。當(dāng)消息算作 commit的時候才會暴露給 consumer,并保證 at-least-once的投遞原則。
【2】服務(wù)的穩(wěn)定容錯性:前面提到過,Kafka天然支持HA,整個 leader/follower 機(jī)制通過 zookeeper調(diào)度,它在所有 Broker中選出一個 controller,所有 Partition的 Leader選舉都由 controller決定,同時 controller也負(fù)責(zé)增刪 Topic以及 Replica的重新分配。如果Leader掛了,集群將在ISR(in-sync replicas)中選出新的Leader,選舉基本原則是:新的 Leader必須擁有原來的 Leader commit 過的所有消息。假如所有的 follower都掛了,Kafka會選擇第一個“活”過來的 Replica(不一定是ISR中的)作為 Leader,因?yàn)槿绻藭r等待 ISR中的 Replica是有風(fēng)險的,假如所有的ISR都無法“活”,那此 Partition將會變成不可用。
【3】吞吐量:Leader 節(jié)點(diǎn)負(fù)責(zé)某一 Topic(可以分成多個 Partition)的某一 Partition的消息的讀寫,任何發(fā)布到此 Partition的消息都會被直接追加到 log文件的尾部,因?yàn)槊織l消息都被 append 到該 Partition中,是順序?qū)懘疟P,因此效率非常高(經(jīng)驗(yàn)證,順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存還要高,這是 Kafka高吞吐率的一個很重要的保證),同時通過合理的 Partition,消息可以均勻的分布在不同的 Partition里面。Kafka基于時間或者 Partition的大小來刪除消息,同時 Broker是無狀態(tài)的,Consumer的消費(fèi)狀態(tài)(offset)是由Consumer 自己控制的(每一個 Consumer實(shí)例只會消費(fèi)某一個或多個特定 Partition的數(shù)據(jù),而某個 Partition的數(shù)據(jù)只會被某一個特定的 Consumer實(shí)例所消費(fèi)),也不需要 Broker通過鎖機(jī)制去控制消息的消費(fèi),所以吞吐量驚人,這也是 Kafka吸引人的地方。最后說下由于 zookeeper 引起的腦裂(Split Brain)問題:腦裂問題就是產(chǎn)生了兩個 Leader,導(dǎo)致集群行為不一致了。1個集群如果發(fā)生了網(wǎng)絡(luò)故障,很可能出現(xiàn)1個集群分成了兩部分,而這兩個部分都不知道對方是否存活,不知道到底是網(wǎng)絡(luò)問題還是直接機(jī)器down了,所以這兩部分都要選舉1個Leader,而一旦兩部分都選出了Leader, 并且網(wǎng)絡(luò)又恢復(fù)了,那么就會出現(xiàn)兩個 Brain的情況,整個集群的行為不一致了。解決:只有集群中超過半數(shù)節(jié)點(diǎn)投票才能選舉出 Leader。ZooKeeper默認(rèn)采用了這種方式。

Kafka 的設(shè)計目標(biāo)kafka在 設(shè)計之初就需要考慮以下5個方面的問題
【1】以時間復(fù)雜度為O(1)的方式提供消息持久化能力,即使對 TB級以上數(shù)據(jù)也能保證常數(shù)時間復(fù)雜度的訪問性能。
【2】高吞吐率,即使在非常廉價的商用機(jī)器上也能做到單機(jī)支持每秒100K條以上消息的傳輸。
【3】支持Kafka Server間的消息分區(qū),及分布式消費(fèi),同時保證每個Partition內(nèi)的消息順序傳輸。
【4】同時支持離線數(shù)據(jù)處理和實(shí)時數(shù)據(jù)處理。
【5】Scale out:支持在線水平擴(kuò)展。
所以,不像 AMQ,Kafka 從設(shè)計開始極為高可用為目的,天然 HA。Broker 支持集群,消息亦支持負(fù)載均衡,還有副本機(jī)制。同樣,Kafka 也是使用 Zookeeper 管理集群節(jié)點(diǎn)信息,包括 Consumer 的消費(fèi)信息也是保存在 zk 中,下面我們分話題來談:

【和傳統(tǒng)的MQ不同】:消費(fèi)者需要自己保留一個offset,從kafka 獲取消息時,只拉取當(dāng)前offset 以后的消息。將 offset 保存到 zookeeper 上。每個消費(fèi)者可以選擇一個id,同樣id 的消費(fèi)者對于同一條消息只會收到一次。一個Topic 的消費(fèi)者如果都使用相同的id,就是傳統(tǒng)的 Queue;如果每個消費(fèi)者都使用不同的id, 就是傳統(tǒng)的pub-sub。
kafka 主從同步怎么實(shí)現(xiàn)】:Kafka 的主從同步,主要是針對它的 Broker來說。在 Kafka 的 Broker 中,同一個 Topic 可以被分配成多個 Partition,每個 Partition的可以有一個或者多個 replicas(備份),即會有一個 Leader 以及 0到多個 Follower,在Consumer 讀取數(shù)據(jù)的時候,只會從 Leader上讀取數(shù)據(jù),Follower只是在 Leader宕機(jī)的時候來替代 Leader,主從同步有兩種方式:同步復(fù)制和異步復(fù)制,Kafka采用的是中間策略 ISR(In Sync Replicas)。
Kafka 的 ISR策略有數(shù)據(jù)寫 Leader的時候,Leader會查看 Follower組成的 ISR列表,并且符合以下兩點(diǎn)才算是屬于 ISR列表:【1】Broker 可以維護(hù)和 zookeeper的連接,zookeeper通過心跳機(jī)制檢查每個節(jié)點(diǎn)的連接?!?】如果節(jié)點(diǎn)是個 Follow它必須能及時同步 Leader的寫操作,不能延時太久。當(dāng)有寫消息的時候,我們可以根據(jù)配置做如下配置:request.required.acks 參數(shù)的設(shè)置來進(jìn)行調(diào)整:

?? 0 ,相當(dāng)于異步發(fā)送,消息發(fā)送完畢即 offset增加,繼續(xù)生產(chǎn);相當(dāng)于At most once;
?? 1,Leader 收到Leader Replica 對一個消息的接收 ack才增加 offset,然后繼續(xù)生產(chǎn);
?? -1,Leader 收到所有 Replica 對一個消息的接收 ack才增加 offset,然后繼續(xù)生產(chǎn);

MQ 的消息延遲了怎么處理


【1】延遲處理:可以通過設(shè)置延遲級別,控制消息延遲的時間。
【2】設(shè)置過期時間:

<broker>     ...         <plugins>                              <timeStampingBrokerPluginttlCeiling="30000" zeroExpirationOverride="30000" />          plugins>     ... broker>

?? 1)Message 過期則客戶端不能接收;
?? 2)ttlCeiling:表示過期時間上限(程序?qū)懙倪^期時間不能超過此時間);
?? 3)zeroExpirationOverride:表示過期時間(給未分配過期時間的消息分配過期時間);
【3】過期消息處理辦法消息過期后會進(jìn)入死信隊列,如不想拋棄死信隊列,默認(rèn)進(jìn)入 ACTIVEMQ.DLQ隊列,且不會自動清除;對于過期的消息進(jìn)入死信隊列還有一些可選的策略:放入各自的死信通道、保存在一個共享的隊列(默認(rèn)),且可以設(shè)置是否將過期消息放入隊列的開關(guān)以及死信隊列消息過期時間。
?? 1)直接拋棄死信隊列:AcitveMQ提供了一個便捷的插件:DiscardingDLQBrokerPlugin,來拋棄DeadLetter。如果開發(fā)者不需要關(guān)心DeadLetter,可以使用此策略。

<broker>...    <plugins>        <discardingDLQBrokerPlugindropAll="true"  dropTemporaryTopics="true" dropTemporaryQueues="true" />                    plugins>    ... broker>

? 2)定時拋棄死信隊列:默認(rèn)情況下,ActiveMQ永遠(yuǎn)不會過期發(fā)送到 DLQ的消息。但是,從 ActiveMQ5.12開始,deadLetterStrategy 支持 expiration屬性,其值以毫秒為單位。

<policyEntryqueue=">"…>   ...  <deadLetterStrategy>    <sharedDeadLetterStrategy processExpired="true" expiration="30000"/>  deadLetterStrategy>   ...policyEntry>

?? 3)慢消費(fèi)者策略設(shè)置:Broker將會啟動一個后臺線程用來檢測所有的慢速消費(fèi)者,并定期關(guān)閉它們;中斷慢速消費(fèi)者,慢速消費(fèi)將會被關(guān)閉。abortConnection是否關(guān)閉連接;如果慢速消費(fèi)者最后一個ACK距離現(xiàn)在的時間間隔超過閥 maxTimeSinceLastAck,則中斷慢速消費(fèi)者。

<policyEntryqueue=">"…>    …    <slowConsumerStrategy>        <abortSlowConsumerStrategyabortConnection="false"/>      slowConsumerStrategy>    …policyEntry>

利用 MQ 怎么實(shí)現(xiàn)最終一致性


RabbitMQ 遵循了 AMQP 規(guī)范,用消息確認(rèn)機(jī)制來保證:只要消息發(fā)送,就能確保被消費(fèi)者消費(fèi),來做到了消息最終一致性。Rabbitmq 的整個發(fā)送過程如下【1】生產(chǎn)者發(fā)送消息到消息服務(wù)。
【2】如果消息落地持久化完成,則返回一個標(biāo)志給生產(chǎn)者。生產(chǎn)者拿到這個確認(rèn)后,才能放心的說消息終于成功發(fā)到消息服務(wù)了。否則進(jìn)入異常處理流程。
【3】消息服務(wù)將消息發(fā)送給消費(fèi)者。
【4】消費(fèi)者接受并處理消息,如果處理成功則手動確認(rèn)。當(dāng)消息服務(wù)拿到這個確認(rèn)后,才放心的說終于消費(fèi)完成了。否則重發(fā),或者進(jìn)入異常處理。

使用 kafka 有沒有遇到什么問題,怎么解決的


問題兩臺設(shè)備上只有一個上存在 logs;
基本情況一個 Topic 配置了四個 Partition,一個 Consumer Group 消費(fèi)此Topic,但使用兩臺服務(wù)器,分別創(chuàng)建 Consumer 實(shí)例。都運(yùn)行日志收集程序。
問題Consumer Group 是將消費(fèi)到的日志寫入服務(wù)器磁盤文件中。有兩臺服務(wù)器都在運(yùn)行此日志收集程序,每個服務(wù)器上的程序都創(chuàng)建了一個 Group 的 Consumer實(shí)例,此 Consumer實(shí)例會分配到兩個 Partition進(jìn)行處理,因此每個服務(wù)器都只存儲了一部分日志文件。但是在測試時發(fā)現(xiàn),所有日志都寫入了 ServerA,ServerB上沒有日志,即便使用測試工具發(fā)送了大量數(shù)據(jù),ServerB仍然沒有日志。
原因查看 log發(fā)現(xiàn),ServerA 上的 Consumer實(shí)例分配的 Partition 為 Partition_0 / Partition_1,serverB 上的 Consumer實(shí)例分配的 Partition 為partition_3 / Partition_4,兩個 Server上的 Consumer實(shí)例都被分配了Partition,Partition分配正常,消費(fèi)應(yīng)該沒有問題。ServerB 上沒有日志數(shù)據(jù),說明沒有數(shù)據(jù)供其消費(fèi),也就是說,所有數(shù)據(jù)都被 Producer發(fā)送到了 Partition_1 或Partition_2 上,這是生產(chǎn)的問題,應(yīng)該是與生產(chǎn)者的分區(qū)路由有關(guān),因此有必要了解下生產(chǎn)者的分區(qū)路由策略。Kafka 中的每個Topic 分配了4個 Partition,生產(chǎn)者(Producer)在將消息記錄(ProducerRecord)發(fā)送到某個 Topic時是要選擇對應(yīng)的 Partition的,選擇 Partition的策略如下:
【1】消息中指定Partition:判斷 Partition字段是否有值,有值就直接將該消息發(fā)送到指定的 Partition就行;
【2】如果沒有指定分區(qū)(Partition),則使用分區(qū)器進(jìn)行分區(qū)路由,首先判斷消息中是否指定了key;
【3】如果指定了key,則使用該 key進(jìn)行 hash操作,并轉(zhuǎn)為正數(shù),然后將其對 Topic相應(yīng)的分區(qū)數(shù)進(jìn)行取余操作,得到一個分區(qū);
【4】如果沒有指定key,則在一個隨機(jī)數(shù)上以自增的方式產(chǎn)生一個數(shù)(第一次時生成隨機(jī)數(shù),之后在其基礎(chǔ)上進(jìn)行自增),轉(zhuǎn)為正數(shù)之后對分區(qū)數(shù)量進(jìn)行取余操作,得到一個分區(qū)。
由于在程序中 Producer發(fā)送記錄的時候指定了固定的 key,根據(jù)這個 key進(jìn)行分區(qū)路由總是會選擇同一個分區(qū),所有日志都被發(fā)送給了同一個分區(qū),因此只有關(guān)聯(lián)這個分區(qū)的 Consumer實(shí)例才能消費(fèi),只有此 Consumer實(shí)例所在的 Server上才有日志。

Kafka 中的 ISR、AR又代表什么


【1】ISR:In-Sync Replicas 副本同步隊列;
【2】AR:Assigned Replicas 所有副本;
ISR是由 Leader維護(hù),Follower 從 Leader同步數(shù)據(jù)有一些延遲(包括延遲時間 replica.lag.time.max.ms 和延遲條數(shù)replica.lag.max.messages 兩個維度, 當(dāng)前最新的版本0.10.x中只支持 replica.lag.time.max.ms這個維度),任意一個超過閾值都會把 Follower剔除出 ISR,存入OSR(Outof-Sync Replicas)列表,新加入的 Follower 也會先存放在OSR中。AR=ISR+OSR。

十七、Kafka 為什么不支持讀寫分離


在 Kafka 中這種功能完全可以支持,同時主寫從讀可以讓從節(jié)點(diǎn)去分擔(dān)主節(jié)點(diǎn)的負(fù)載壓力,預(yù)防主節(jié)點(diǎn)負(fù)載過重而從節(jié)點(diǎn)卻空閑的情況發(fā)生。但是主寫從讀也有 2 個很明顯的缺點(diǎn)
【1】數(shù)據(jù)一致性問題。數(shù)據(jù)從主節(jié)點(diǎn)轉(zhuǎn)到從節(jié)點(diǎn)必然會有一個延時的時間窗口,這個時間窗口會導(dǎo)致主從節(jié)點(diǎn)之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點(diǎn)和從節(jié)點(diǎn)中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點(diǎn)中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點(diǎn)之前,應(yīng)用讀取從節(jié)點(diǎn)中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。
【2】延時問題。類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點(diǎn)到同步至從節(jié)點(diǎn)中的過程需要經(jīng)歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→網(wǎng)絡(luò)→從節(jié)點(diǎn)內(nèi)存這幾個階段,整個過程會耗費(fèi)一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經(jīng)歷網(wǎng)絡(luò)→主節(jié)點(diǎn)內(nèi)存→主節(jié)點(diǎn)磁盤→網(wǎng)絡(luò)→從節(jié) 點(diǎn)內(nèi)存→從節(jié)點(diǎn)磁盤這幾個階段。對延時敏感的應(yīng)用而言,主寫從讀的功能并不太適用。
Kafka 架構(gòu)導(dǎo)致我們沒有必要使用主從分離】在 Kafka 中 這種負(fù)載均衡是在主寫主讀的架構(gòu)上實(shí)現(xiàn)的。我們來看 一下 Kafka 的生產(chǎn)消費(fèi)模型,如下圖所示。
在 Kafka 集群中有 3 個分區(qū),每個分區(qū)有 3 個副本,正好均勻地分布在 3個 broker 上,灰色陰影的代表 Leader 副本,非灰色陰影的代表 Follower 副本,虛線表示 Follower 副本從 Leader 副本上拉取消息。當(dāng)生產(chǎn)者寫入消息的時候都寫入 Leader 副本,對于圖中的情形,每個 Broker 都有消息從生產(chǎn)者流入。當(dāng)消費(fèi)者讀取消息的時候也是從 Leader 副本中讀取 的,對于圖中的情形,每個 Broker 都有消息流出到消費(fèi)者。從而將壓力分配到每個服務(wù)器上,從而實(shí)現(xiàn)了負(fù)載均衡功能。

ZK 在 kafka 中的作用


【1】Broker 注冊:Broker 是分布式部署并且相互之間相互獨(dú)立,但是需要有一個注冊系統(tǒng)能夠?qū)⒄麄€集群中的 Broker管理起來,此時就使用到了 Zookeeper。在 Zookeeper上會有一個專門用來進(jìn)行 Broker服務(wù)器列表記錄的節(jié)點(diǎn):/brokers/ids 每個Broker在啟動時,都會到 Zookeeper上進(jìn)行注冊,即到 /brokers/ids下創(chuàng)建屬于自己的節(jié)點(diǎn),如/brokers/ids/[0...N]。Kafka 使用了全局唯一的數(shù)字來指代每個 Broker服務(wù)器,不同的 Broker必須使用不同的 Broker ID進(jìn)行注冊,創(chuàng)建完節(jié)點(diǎn)后,每個 Broker就會將自己的 IP地址和端口信息記錄到該節(jié)點(diǎn)中去。其中,Broker創(chuàng)建的節(jié)點(diǎn)類型是臨時節(jié)點(diǎn),一旦 Broker宕機(jī),則對應(yīng)的臨時節(jié)點(diǎn)也會被自動刪除。
【2】Topic 注冊:在 Kafka中,同一個Topic的消息會被分成多個分區(qū)并將其分布在多個 Broker上,這些分區(qū)信息及與Broker的對應(yīng)關(guān)系也都是由 Zookeeper在維護(hù),由專門的節(jié)點(diǎn)來記錄,如:/borkers/topics Kafka 中每個 Topic都會以 /brokers/topics/[topic] 的形式被記錄,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker服務(wù)器啟動后,會到對應(yīng) Topic節(jié)點(diǎn)(/brokers/topics)上注冊自己的 Broker ID并寫入針對該 Topic的分區(qū)總數(shù),如 /brokers/topics/login/3->2,這個節(jié)點(diǎn)表示Broker ID為3的一個 Broker服務(wù)器,對于"login" 這個 Topic的消息,提供了2個分區(qū)進(jìn)行消息存儲,同樣,這個分區(qū)節(jié)點(diǎn)也是臨時節(jié)點(diǎn)。
【3】生產(chǎn)者負(fù)載均衡:由于同一個 Topic消息會被分區(qū)并將其分布在多個 Broker上,因此,生產(chǎn)者需要將消息合理地發(fā)送到這些分布式的Broker上,那么如何實(shí)現(xiàn)生產(chǎn)者的負(fù)載均衡,Kafka 支持傳統(tǒng)的四層負(fù)載均衡,也支持 Zookeeper方式實(shí)現(xiàn)負(fù)載均衡。
?? ■? 四層負(fù)載均衡,根據(jù)生產(chǎn)者的 IP地址和端口來為其確定一個相關(guān)聯(lián)的 Broker。通常,一個生產(chǎn)者只會對應(yīng)單個 Broker,然后該生產(chǎn)者產(chǎn)生的消息都發(fā)往該Broker。這種方式邏輯簡單,每個生產(chǎn)者不需要同其他系統(tǒng)建立額外的 TCP連接,只需要和 Broker維護(hù)單個 TCP連接即可。但是,其無法做到真正的負(fù)載均衡,因?yàn)閷?shí)際系統(tǒng)中的每個生產(chǎn)者產(chǎn)生的消息量及每個 Broker的消息存儲量都是不一樣的,如果有些生產(chǎn)者產(chǎn)生的消息遠(yuǎn)多于其他生產(chǎn)者的話,那么會導(dǎo)致不同的 Broker接收到的消息總數(shù)差異巨大,同時,生產(chǎn)者也無法實(shí)時感知到 Broker的新增和刪除。
?? ■? 使用 Zookeeper進(jìn)行負(fù)載均衡,由于每個Broker啟動時,都會完成 Broker注冊過程,生產(chǎn)者會通過該節(jié)點(diǎn)的變化來動態(tài)地感知到 Broker服務(wù)器列表的變更,這樣就可以實(shí)現(xiàn)動態(tài)的負(fù)載均衡機(jī)制。
【4】消費(fèi)者負(fù)載均衡:與生產(chǎn)者類似,Kafka 中的消費(fèi)者同樣需要進(jìn)行負(fù)載均衡來實(shí)現(xiàn)多個消費(fèi)者合理地從對應(yīng)的 Broker服務(wù)器上接收消息,每個消費(fèi)者分組包含若干消費(fèi)者,每條消息都只會發(fā)送給分組中的一個消費(fèi)者,不同的消費(fèi)者分組消費(fèi)自己特定的Topic下面的消息,互不干擾。
【5】分區(qū)與消費(fèi)者的關(guān)系:消費(fèi)組 (Consumer Group)下有多個 Consumer(消費(fèi)者)。對于每個消費(fèi)者組 (Consumer Group),Kafka 都會為其分配一個全局唯一的Group ID,Group 內(nèi)部的所有消費(fèi)者共享該 ID。訂閱的 Topic下的每個分區(qū)只能分配給某個 group 下的一個 Consumer(當(dāng)然該分區(qū)還可以被分配給其他 group)。同時,Kafka為每個消費(fèi)者分配一個Consumer ID,通常采用"Hostname:UUID"形式表示。在 Kafka中,規(guī)定了每個消息分區(qū)只能被同組的一個消費(fèi)者進(jìn)行消費(fèi),因此,需要在 Zk 上記錄消息分區(qū)與 Consumer 之間的關(guān)系,每個消費(fèi)者一旦確定了對一個消息分區(qū)的消費(fèi)權(quán)力,需要將其Consumer ID 寫入到 Zookeeper 對應(yīng)消息分區(qū)的臨時節(jié)點(diǎn)上,例如:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] 其中,[broker_id-partition_id]就是一個消息分區(qū)的標(biāo)識,節(jié)點(diǎn)內(nèi)容就是該消息分區(qū)上消費(fèi)者的 Consumer ID。
【6】消息消費(fèi)進(jìn)度Offset 記錄:在消費(fèi)者對指定消息分區(qū)進(jìn)行消息消費(fèi)的過程中,需要定時地將分區(qū)消息的消費(fèi)進(jìn)度 Offset記錄到Zookeeper上,以便在該消費(fèi)者進(jìn)行重啟或者其他消費(fèi)者重新接管該消息分區(qū)的消息消費(fèi)后,能夠從之前的進(jìn)度開始繼續(xù)進(jìn)行消息消費(fèi)。Offset 在 Zookeeper中由一個專門節(jié)點(diǎn)進(jìn)行記錄,其節(jié)點(diǎn)路徑為:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] 節(jié)點(diǎn)內(nèi)容就是 Offset的值。
【7】消費(fèi)者注冊:消費(fèi)者服務(wù)器在初始化啟動時加入消費(fèi)者分組的步驟如下:注冊到消費(fèi)者分組。每個消費(fèi)者服務(wù)器啟動時,都會到 Zookeeper的指定節(jié)點(diǎn)下創(chuàng)建一個屬于自己的消費(fèi)者節(jié)點(diǎn),例如/consumers/[group_id]/ids/[consumer_id],完成節(jié)點(diǎn)創(chuàng)建后,消費(fèi)者就會將自己訂閱的 Topic信息寫入該臨時節(jié)點(diǎn)。對消費(fèi)者分組中的消費(fèi)者的變化注冊監(jiān)聽。每個消費(fèi)者都需要關(guān)注所屬消費(fèi)者分組中其他消費(fèi)者服務(wù)器的變化情況,即對/consumers/[group_id]/ids節(jié)點(diǎn)注冊子節(jié)點(diǎn)變化的 Watcher監(jiān)聽,一旦發(fā)現(xiàn)消費(fèi)者新增或減少,就觸發(fā)消費(fèi)者的負(fù)載均衡。對Broker服務(wù)器變化注冊監(jiān)聽。消費(fèi)者需要對/broker/ids/[0-N]中的節(jié)點(diǎn)進(jìn)行監(jiān)聽,如果發(fā)現(xiàn) Broker服務(wù)器列表發(fā)生變化,那么就根據(jù)具體情況來決定是否需要進(jìn)行消費(fèi)者負(fù)載均衡。進(jìn)行消費(fèi)者負(fù)載均衡。為了讓同一個Topic下不同分區(qū)的消息盡量均衡地被多個消費(fèi)者消費(fèi)而進(jìn)行消費(fèi)者與消息分區(qū)分配的過程,通常,對于一個消費(fèi)者分組,如果組內(nèi)的消費(fèi)者服務(wù)器發(fā)生變更或 Broker服務(wù)器發(fā)生變更,會發(fā)出消費(fèi)者負(fù)載均衡。
ZK 的詳細(xì)存儲結(jié)構(gòu)圖】:

早期版本的 kafka 用 zk 做 meta 信息存儲,consumer 的消費(fèi)狀態(tài),group 的管理以及 offset 的值。考慮到 zk本身的一些因素以及整個架構(gòu)較大概率存在單點(diǎn)問題,新版本中確實(shí)逐漸弱化了 zookeeper的作用。新的 consumer使用了 kafka內(nèi)部的 group coordination 協(xié)議,也減少了對 zookeeper的依賴。

Kafka Follower 如何與 Leader同步數(shù)據(jù)


Kafka 使用 ISR的方式很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower 可以批量的從 Leader復(fù)制數(shù)據(jù),而且Leader充分利用磁盤順序讀以及send file(zero copy)機(jī)制,這樣極大的提高復(fù)制性能,內(nèi)部批量寫磁盤,大幅減少了 Follower與 Leader的消息量差。所有的 Follower 都復(fù)制 Leader 的日志,日志中的消息和順序都和 Leader 中的一致。Follower 像普通的 Consumer 那樣從 Leader 那里拉取消息并保存在自己的日志文件中。ISR 中有f+1個節(jié)點(diǎn),就可以允許在f個節(jié)點(diǎn) Down掉的情況下不會丟失消息并正常提供服。ISR 的成員是動態(tài)的,如果一個節(jié)點(diǎn)被淘汰了,當(dāng)它重新達(dá)到“同步中”的狀態(tài)時,他可以重新加入ISR。因此如果 Leader宕了,直接從 ISR中選擇一個 Follower就行。只有當(dāng)消息被所有的副本加入到日志中時,才算是“committed”,只有 committed的消息才會發(fā)送給 consumer,這樣就不用擔(dān)心 Leader Down掉了消息會丟失。Kafka 選擇一個節(jié)點(diǎn)作為“controller”,當(dāng)發(fā)現(xiàn)有Leader 節(jié)點(diǎn) Down掉的時候它負(fù)責(zé)在LSR 分區(qū)的所有節(jié)點(diǎn)中選擇新的 Leader,這使得 Kafka可以批量的高效的管理所有分區(qū)節(jié)點(diǎn)的主從關(guān)系。如果 controller down掉了,活著的節(jié)點(diǎn)中的一個會被切換為新的 controller。

什么情況下 Follower 會從 ISR 中踢除


Leader 維護(hù)一個與其基本保持同步的 Replica列表,該列表稱為 ISR(in-sync Replica),每個 Partition都會有一個 ISR,而且是由Leader動態(tài)維護(hù) ,如果 Follower 比 Leader落后太多消息數(shù)量【replica.lag.max.messages】,或者超過一定時間未發(fā)起數(shù)據(jù)復(fù)制請求【replica.lag.time.max.ms】,則 Leader將其從 ISR中移除 。

Kafka 為什么那么快


Kafka 的消息是保存或緩存在磁盤上的,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會降低性能的,因?yàn)閷ぶ窌容^消耗時間,但是實(shí)際上,Kafka 的特性之一就是高吞吐率。Kafka 之所以能這么快,是因?yàn)?strong>順序?qū)懘疟P、大量使用內(nèi)存頁 、零拷貝技術(shù)的使用
數(shù)據(jù)寫入Kafka 會把收到的消息都寫入到硬盤中,不會丟失數(shù)據(jù)。為了優(yōu)化寫入速度 Kafka 采用了兩個技術(shù), 順序?qū)懭?/strong>和 MMFile(Memory Mapped?File)
原因一:順序?qū)懭?/strong>】磁盤讀寫的快慢取決于你怎么使用它,也就是順序讀寫或者隨機(jī)讀寫。在順序讀寫的情況下,磁盤的順序讀寫速度和內(nèi)存持平。因?yàn)橛脖P是機(jī)械結(jié)構(gòu),每次讀寫都會尋址->寫入,其中尋址是一個“機(jī)械動作”,它是最耗時的。所以硬盤最討厭隨機(jī) I/O,最喜歡順序 I/O。為了提高讀寫硬盤的速度,Kafka 就是使用順序 I/O。如果在內(nèi)存做這些操作的時候,一個是 Java 對象的內(nèi)存開銷很大,另一個是隨著堆內(nèi)存數(shù)據(jù)的增多,Java 的 GC 時間會變得很長。
使用磁盤操作有以下幾個好處:①、磁盤順序讀寫速度超過內(nèi)存隨機(jī)讀寫。②、JVM 的 GC 效率低,內(nèi)存占用大。使用磁盤可以避免這一問題。③、系統(tǒng)冷啟動后,磁盤緩存依然可用。下圖就展示了 Kafka 是如何寫入數(shù)據(jù)的, 每一個 Partition 其實(shí)都是一個文件 ,收到消息后 Kafka 會把數(shù)據(jù)插入到文件末尾(虛框部分):

該方法的缺陷:沒有辦法刪除數(shù)據(jù) ,所以 Kafka 是不會刪除數(shù)據(jù)的,它會把所有的數(shù)據(jù)都保留下來,每個消費(fèi)者(Consumer)對每個 Topic 都有一個 Offset 用來表示讀取到了第幾條數(shù)據(jù) 。
如果不刪除硬盤肯定會被撐滿,所以 Kakfa 提供了兩種策略來刪除數(shù)據(jù):
【1】基于時間;
【2】基于 Partition 文件大小;
原因二:Memory Mapped Files即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存。所以 Kafka 的數(shù)據(jù)并不是實(shí)時的寫入硬盤 ,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高 I/O 效率。Memory Mapped Files(后面簡稱 mmap)也被翻譯成內(nèi)存映射文件 ,在 64 位操作系統(tǒng)中一般可以表示 20G 的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的 Page 來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r候)。通過 mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小,有虛擬內(nèi)存為我們兜底。使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷。(調(diào)用文件的 Read 會把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再復(fù)制到用戶空間的內(nèi)存中)但也有一個很明顯的缺陷:不可靠,寫到 mmap 中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會在程序主動調(diào)用 Flush 的時候才把數(shù)據(jù)真正的寫到硬盤。Kafka 提供了一個參數(shù) producer.type 來控制是不是主動 Flush:如果 Kafka 寫入到 mmap 之后就立即 Flush,然后再返回 Producer 叫同步 (Sync)。如果 Kafka 寫入 mmap 之后立即返回 Producer 不調(diào)用 Flush 叫異步 (Async)。
原因三:Zero Copy傳統(tǒng)模式下,當(dāng)需要對一個文件進(jìn)行傳輸?shù)臅r候,其具體流程細(xì)節(jié)如下:調(diào)用 Read 函數(shù),文件數(shù)據(jù)被 Copy 到內(nèi)核緩沖區(qū)。Read 函數(shù)返回,文件數(shù)據(jù)從內(nèi)核緩沖區(qū) Copy 到用戶緩沖區(qū)。Write 函數(shù)調(diào)用,將文件數(shù)據(jù)從用戶緩沖區(qū) Copy 到內(nèi)核與 Socket 相關(guān)的緩沖區(qū)。數(shù)據(jù)從 Socket 緩沖區(qū) Copy 到相關(guān)協(xié)議引擎。以上細(xì)節(jié)是傳統(tǒng) Read/Write 方式進(jìn)行網(wǎng)絡(luò)文件傳輸?shù)姆绞?#xff0c;我們可以看到,在這個過程當(dāng)中,文件數(shù)據(jù)實(shí)際上是經(jīng)過了四次 Copy 操作:硬盤—>內(nèi)核 buf—>用戶 buf—>Socket 相關(guān)緩沖區(qū)—>協(xié)議引擎。而 Sendfile 系統(tǒng)調(diào)用則提供了一種減少以上多次 Copy,提升文件傳輸性能的方法。在內(nèi)核版本 2.1 中,引入了 Sendfile 系統(tǒng)調(diào)用,以簡化網(wǎng)絡(luò)上和兩個本地文件之間的數(shù)據(jù)傳輸。Sendfile 的引入不僅減少了數(shù)據(jù)復(fù)制,還減少了上下文切換。sendfile(socket, file, len);
運(yùn)行流程如下【1】Sendfile 系統(tǒng)調(diào)用,文件數(shù)據(jù)被 Copy 至內(nèi)核緩沖區(qū)。【2】再從內(nèi)核緩沖區(qū) Copy 至內(nèi)核中 Socket 相關(guān)的緩沖區(qū)?!?】最后再 Socket 相關(guān)的緩沖區(qū) Copy 到協(xié)議引擎。
相較傳統(tǒng) Read/Write 方式,2.1 版本內(nèi)核引進(jìn)的 Sendfile 已經(jīng)減少了內(nèi)核緩沖區(qū)到 User 緩沖區(qū),再由 User 緩沖區(qū)到 Socket 相關(guān)緩沖區(qū)的文件 Copy。而在內(nèi)核版本 2.4 之后,文件描述符結(jié)果被改變,Sendfile 實(shí)現(xiàn)了更簡單的方式,再次減少了一次 Copy 操作。在 Apache、Nginx、Lighttpd 等 Web 服務(wù)器當(dāng)中,都有一項 Sendfile 相關(guān)的配置,使用 Sendfile 可以大幅提升文件傳輸性能。Kafka 把所有的消息都存放在一個一個的文件中,當(dāng)消費(fèi)者需要數(shù)據(jù)的時候 Kafka 直接把文件發(fā)送給消費(fèi)者,配合 mmap 作為文件讀寫方式,直接把它傳給 Sendfile。
原因四:批量壓縮在很多情況下,系統(tǒng)的瓶頸不是 CPU 或磁盤,而是網(wǎng)絡(luò) IO,對于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線尤其如此。進(jìn)行數(shù)據(jù)壓縮會消耗少量的 CPU 資源,不過對于 Kafka 而言,網(wǎng)絡(luò) IO 更應(yīng)該考慮:
? ■ ?因?yàn)槊總€消息都壓縮,但是壓縮率相對很低,所以 Kafka 使用了批量壓縮,即將多個消息一起壓縮而不是單個消息壓縮。
? ■ ?Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費(fèi)者解壓縮。
? ■ ?Kafka 支持多種壓縮協(xié)議,包括 Gzip 和 Snappy 壓縮協(xié)議。
總結(jié)Kafka 速度的秘訣在于,它把所有的消息都變成一個批量的文件,并且進(jìn)行合理的批量壓縮,減少網(wǎng)絡(luò) IO 損耗,通過 mmap 提高 I/O 速度。寫入數(shù)據(jù)的時候由于單個 Partion 是末尾添加,所以速度最優(yōu);讀取數(shù)據(jù)的時候配合 Sendfile 直接暴力輸出。

kafka 使用過程中遇到的問題


基本情況兩臺設(shè)備上只有一個上存在 logs;
詳細(xì)情況一個topic,此topic配置了四個partition。兩個consumer group,這兩個consumer group用于消費(fèi)同一個topic,但做不同的處理任務(wù)。每個consumer group中都只有一個 consumer實(shí)例進(jìn)行消費(fèi)。兩臺服務(wù)器,都運(yùn)行此日志收集程序。
問題 兩個consumer group用于消費(fèi)同一個 topic并做不同的處理,其中一個 consumer group(稱作 group2)是將消費(fèi)到的日志寫入服務(wù)器磁盤文件中。有兩臺服務(wù)器都在運(yùn)行此日志收集程序,每個服務(wù)器上的程序都創(chuàng)建了一個group2的consumer實(shí)例,此consumer實(shí)例會分配到兩個 partition進(jìn)行處理,因此每個服務(wù)器都只存儲了一部分日志文件。但是在測試時發(fā)現(xiàn),所有日志都寫入了server1,server2上沒有日志,即便使用測試工具發(fā)送了大量數(shù)據(jù),server2仍然沒有日志。
原因查看 log發(fā)現(xiàn),server1上的 consumer實(shí)例分配的 partition為 partition_0 partition_1,server2上的 consumer實(shí)例分配的partition為partition_3、partition_4,兩個server上的consumer實(shí)例都被分配了partition,partition分配正常,消費(fèi)應(yīng)該沒有問題。server2上沒有日志數(shù)據(jù),說明沒有數(shù)據(jù)供其消費(fèi),也就是說,所有數(shù)據(jù)都被 producer發(fā)送到了 partition_1或 partition_2上,這是生產(chǎn)的問題,應(yīng)該是與生產(chǎn)者的分區(qū)路由有關(guān),因此有必要了解下生產(chǎn)者的分區(qū)路由策略。Kafka中的每個 Topic分配了4個Partition,生產(chǎn)者(Producer)在將消息記錄(ProducerRecord)發(fā)送到某個 Topic時是要選擇對應(yīng)的 Partition的,選擇 Partition的策略如下:
【1】判斷消息中的 partition字段是否有值,有值的話就是指定了partition,直接將該消息發(fā)送到指定的 partition就行。
【2】如果沒有指定分區(qū)(partition),則使用分區(qū)器進(jìn)行分區(qū)路由,首先判斷消息中是否指定了key。
【3】如果指定了key,則使用該key進(jìn)行hash操作,并轉(zhuǎn)為正數(shù),然后將其對topic相應(yīng)的分區(qū)數(shù)進(jìn)行取余操作,得到一個分區(qū)。
【4】如果沒有指定key,則在一個隨機(jī)數(shù)上以自增的方式產(chǎn)生一個數(shù)(第一次時生成隨機(jī)數(shù),之后在其基礎(chǔ)上進(jìn)行自增),轉(zhuǎn)為正數(shù)之后對分區(qū)數(shù)量進(jìn)行取余操作,得到一個分區(qū)。
由于在程序中Producer發(fā)送記錄的時候指定了固定的key,根據(jù)這個key進(jìn)行分區(qū)路由總是會選擇同一個分區(qū),所有日志都被發(fā)送給了同一個分區(qū),因此只有關(guān)聯(lián)這個分區(qū)的 consumer實(shí)例才能消費(fèi),只有此 consumer實(shí)例所在的 server上才有日志。

end

總結(jié)

以上是生活随笔為你收集整理的websphere mq 查看队列中是否有数据_全网最全的 “消息队列”的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。