kafka consumer配置拉取速度慢_Kafka消费者的使用和原理
這周我們學(xué)習(xí)下消費(fèi)者,仍然還是先從一個(gè)消費(fèi)者的Hello World學(xué)起:
public?class?Consumer?{????public?static?void?main(String[]?args)?{????????//?1.?配置參數(shù)????????Properties?properties?=?new?Properties();????????properties.put("key.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("value.deserializer",????????????????"org.apache.kafka.common.serialization.StringDeserializer");????????properties.put("bootstrap.servers",?"localhost:9092");????????properties.put("group.id",?"group.demo");????????//?2.?根據(jù)參數(shù)創(chuàng)建KafkaConsumer實(shí)例(消費(fèi)者)????????KafkaConsumer?consumer?=?new?KafkaConsumer<>(properties);????????//?3.?訂閱主題????????consumer.subscribe(Collections.singletonList("topic-demo"));????????try?{????????????// 4. 輪循消費(fèi)????????????while?(true)?{????????????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????????????for?(ConsumerRecord?record?:?records)?{????????????????????System.out.println(record.value());????????????????}????????????}????????}?finally?{????????????//?5.?關(guān)閉消費(fèi)者????????????consumer.close();????????}????}}前兩步和生產(chǎn)者類似,配置參數(shù)然后根據(jù)參數(shù)創(chuàng)建實(shí)例,區(qū)別在于消費(fèi)者使用的是反序列化器,以及多了一個(gè)必填參數(shù)group.id,用于指定消費(fèi)者所屬的消費(fèi)組。關(guān)于消費(fèi)組的概念在《圖解Kafka中的基本概念》中介紹過了,消費(fèi)組使得消費(fèi)者的消費(fèi)能力可橫向擴(kuò)展,這次再介紹一個(gè)新的概念“再均衡”,其意思是將分區(qū)的所屬權(quán)進(jìn)行重新分配,發(fā)生于消費(fèi)者中有新的消費(fèi)者加入或者有消費(fèi)者宕機(jī)的時(shí)候。我們先了解再均衡的概念,至于如何再均衡不在此深究。
我們繼續(xù)看上面的代碼,第3步,subscribe訂閱期望消費(fèi)的主題,然后進(jìn)入第4步,輪循調(diào)用poll方法從Kafka服務(wù)器拉取消息。給poll方法中傳遞了一個(gè)Duration對象,指定poll方法的超時(shí)時(shí)長,即當(dāng)緩存區(qū)中沒有可消費(fèi)數(shù)據(jù)時(shí)的阻塞時(shí)長,避免輪循過于頻繁。poll方法返回的是一個(gè)ConsumerRecords對象,其內(nèi)部對多個(gè)分區(qū)的ConsumerRecored進(jìn)行了封裝,其結(jié)構(gòu)如下:
public?class?ConsumerRecords?implements?Iterable>?{????????private?final?Map>>?records;????//?...????}而ConsumerRecord則類似ProducerRecord,封裝了消息的相關(guān)屬性:
public?class?ConsumerRecord?{????private?final?String?topic;??//?主題????private?final?int?partition;??//?分區(qū)號????private?final?long?offset;??//?偏移量????private?final?long?timestamp;??//?時(shí)間戳????private?final?TimestampType?timestampType;??//?時(shí)間戳類型????private?final?int?serializedKeySize;??//?key序列化后的大小????private?final?int?serializedValueSize;??//?value序列化后的大小????private?final?Headers?headers;??//?消息頭部????private?final?K?key;??//?鍵????private?final?V?value;??//?值????private?final?Optional?leaderEpoch;??//?leader的周期號相比ProdercerRecord的屬性更多,其中重點(diǎn)講下偏移量,偏移量是分區(qū)中一條消息的唯一標(biāo)識。消費(fèi)者在每次調(diào)用poll方法時(shí),則是根據(jù)偏移量去分區(qū)拉取相應(yīng)的消息。而當(dāng)一臺消費(fèi)者宕機(jī)時(shí),會發(fā)生再均衡,將其負(fù)責(zé)的分區(qū)交給其他消費(fèi)者處理,這時(shí)可以根據(jù)偏移量去繼續(xù)從宕機(jī)前消費(fèi)的位置開始。
而為了應(yīng)對消費(fèi)者宕機(jī)情況,偏移量被設(shè)計(jì)成不存儲在消費(fèi)者的內(nèi)存中,而是被持久化到一個(gè)Kafka的內(nèi)部主題__consumer_offsets中,在Kafka中,將偏移量存儲的操作稱作提交。而消費(fèi)者在每次消費(fèi)消息時(shí)都將會將偏移量進(jìn)行提交,提交的偏移量為下次消費(fèi)的位置,例如本次消費(fèi)的偏移量為x,則提交的是x+1。
在代碼中我們并沒有看到顯示的提交代碼,那么Kafka的默認(rèn)提交方式是什么?默認(rèn)情況下,消費(fèi)者會定期以auto_commit_interval_ms(5秒)的頻率進(jìn)行一次自動提交,而提交的動作發(fā)生于poll方法里,在進(jìn)行拉取操作前會先檢查是否可以進(jìn)行偏移量提交,如果可以,則會提交即將拉取的偏移量。
下面我們看下這樣一個(gè)場景,上次提交的偏移量為2,而當(dāng)前消費(fèi)者已經(jīng)處理了2、3、4號消息,正準(zhǔn)備提交5,但卻宕機(jī)了。當(dāng)發(fā)生再均衡時(shí),其他消費(fèi)者將繼續(xù)從已提交的2開始消費(fèi),于是發(fā)生了重復(fù)消費(fèi)的現(xiàn)象。
我們可以通過減小自動提交的時(shí)間間隔來減小重復(fù)消費(fèi)的窗口大小,但這樣仍然無法避免重復(fù)消費(fèi)的發(fā)生。
按照線性程序的思維,由于自動提交是延遲提交,即在處理完消息之后進(jìn)行提交,所以應(yīng)該不會出現(xiàn)消息丟失的現(xiàn)象,也就是已提交的偏移量會大于正在處理的偏移量。但放在多線程環(huán)境中,消息丟失的現(xiàn)象是可能發(fā)生的。例如線程A負(fù)責(zé)調(diào)用poll方法拉取消息并放入一個(gè)隊(duì)列中,由線程B負(fù)責(zé)處理消息。如果線程A已經(jīng)提交了偏移量5,而線程B還未處理完2、3、4號消息,這時(shí)候發(fā)生宕機(jī),則將丟失消息。
從上述場景的描述,我們可以知道自動提交是存在風(fēng)險(xiǎn)的。所以Kafka除了自動提交,還提供了手動提交的方式,可以細(xì)分為同步提交和異步提交,分別對應(yīng)了KafkaConsumer中的commitSync和commitAsync方法。我們先嘗試使用同步提交修改程序:
while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitSync();;}在處理完一批消息后,都會提交偏移量,這樣能減小重復(fù)消費(fèi)的窗口大小,但是由于是同步提交,所以程序會阻塞等待提交成功后再繼續(xù)處理下一條消息,這樣會限制程序的吞吐量。那我們改為使用異步提交:
while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????}????consumer.commitAsync();;}異步提交時(shí),程序?qū)⒉粫枞?#xff0c;但異步提交在提交失敗時(shí)也不會進(jìn)行重試,所以提交是否成功是無法保證的。因此我們可以組合使用兩種提交方式。在輪詢中使用異步提交,而當(dāng)關(guān)閉消費(fèi)者時(shí),再通過同步提交來保證提交成功。
try?{????while?(true)?{????????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????????for?(ConsumerRecord?record?:?records)?{????????????System.out.println(record.value());????????}????????consumer.commitAsync();????}}?finally?{????try?{????????consumer.commitSync();????}?finally?{????????consumer.close();????}}上述介紹的兩種無參的提交方式都是提交的poll返回的一個(gè)批次的數(shù)據(jù)。若未來得及提交,也會造成重復(fù)消費(fèi),如果還想更進(jìn)一步減少重復(fù)消費(fèi),可以在for循環(huán)中為commitAsync和commitSync傳入分區(qū)和偏移量,進(jìn)行更細(xì)粒度的提交,例如每1000條消息我們提交一次:
Map?currentOffsets?=?new?HashMap<>();int?count?=?0;while?(true)?{????ConsumerRecords?records?=?consumer.poll(Duration.ofMillis(1000));????for?(ConsumerRecord?record?:?records)?{????????System.out.println(record.value());????????//?偏移量加1????????currentOffsets.put(new?TopicPartition(record.topic(),?record.partition()),???????????????????????????new?OffsetAndMetadata(record.offset()?+?1));????????if?(count?%?1000?==?0)?{????????????consumer.commitAsync(currentOffsets,?null);????????}????????count++;????}}關(guān)于提交就介紹到這里。在使用消費(fèi)者的代理中,我們可以看到poll方法是其中最為核心的方法,能夠拉取到我們需要消費(fèi)的消息。所以接下來,我們一起深入到消費(fèi)者API的幕后,看看在poll方法中,都發(fā)生了什么,其實(shí)現(xiàn)如下:
public?ConsumerRecords?poll(final?Duration?timeout)?{????return?poll(time.timer(timeout),?true);}在我們使用設(shè)置超時(shí)時(shí)間的poll方法中,會調(diào)用重載方法,第二個(gè)參數(shù)includeMetadataInTimeout用于標(biāo)識是否把元數(shù)據(jù)的獲取算在超時(shí)時(shí)間內(nèi),這里傳值為true,也就是算入超時(shí)時(shí)間內(nèi)。下面再看重載的poll方法的實(shí)現(xiàn):
private?ConsumerRecords?poll(final?Timer?timer,?final?boolean?includeMetadataInTimeout)?{????//?1.?獲取鎖并確保消費(fèi)者沒有關(guān)閉????acquireAndEnsureOpen();????try?{????????//?2.記錄poll開始????????this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());????????//?3.檢查是否有訂閱主題????????if?(this.subscriptions.hasNoSubscriptionOrUserAssignment())?{????????????throw?new?IllegalStateException("Consumer?is?not?subscribed?to?any?topics?or?assigned?any?partitions");????????}????????do?{????????????//?4.安全的喚醒消費(fèi)者????????????client.maybeTriggerWakeup();????????????//?5.更新偏移量(如果需要的話)????????????if?(includeMetadataInTimeout)?{????????????????updateAssignmentMetadataIfNeeded(timer,?false);????????????}?else?{????????????????while?(!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE),?true))?{????????????????????log.warn("Still?waiting?for?metadata");????????????????}????????????}????????????// 6.拉取消息????????????final?Map>>?records?=?pollForFetches(timer);????????????if?(!records.isEmpty())?{????????????????//?7.如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息????????????????//?所以會再次發(fā)起拉取消息的請求(異步),提高效率????????????????if?(fetcher.sendFetches()?>?0?||?client.hasPendingRequests())?{????????????????????client.transmitSends();????????????????}????????????????//?8.調(diào)用消費(fèi)者攔截器處理????????????????return?this.interceptors.onConsume(new?ConsumerRecords<>(records));????????????}????????}?while?(timer.notExpired());????????return?ConsumerRecords.empty();????}?finally?{????????//?9.釋放鎖????????release();????????//?10.記錄poll結(jié)束????????this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());????}}我們對上面的代碼逐步分析,首先是第1步acquireAndEnsureOpen方法,獲取鎖并確保消費(fèi)者沒有關(guān)閉,其實(shí)現(xiàn)如下:
private?void?acquireAndEnsureOpen()?{????acquire();????if?(this.closed)?{????????release();????????throw?new?IllegalStateException("This?consumer?has?already?been?closed.");????}}其中acquire方法用于獲取鎖,為什么這里會要上鎖。這是因?yàn)镵afkaConsumer是線程不安全的,所以需要上鎖,確保只有一個(gè)線程使用KafkaConsumer拉取消息,其實(shí)現(xiàn)如下:
private?static?final?long?NO_CURRENT_THREAD?=?-1L;private?final?AtomicLong?currentThread?=?new?AtomicLong(NO_CURRENT_THREAD);private?final?AtomicInteger?refcount?=?new?AtomicInteger(0);private?void?acquire()?{????long?threadId?=?Thread.currentThread().getId();????if?(threadId?!=?currentThread.get()?&&?!currentThread.compareAndSet(NO_CURRENT_THREAD,?threadId))????????throw?new?ConcurrentModificationException("KafkaConsumer?is?not?safe?for?multi-threaded?access");????refcount.incrementAndGet();}用一個(gè)原子變量currentThread作為鎖,通過cas操作獲取鎖,如果cas失敗,即獲取鎖失敗,表示發(fā)生了競爭,有多個(gè)線程在使用KafkaConsumer,則會拋出ConcurrentModificationException異常,如果cas成功,還會將refcount加一,用于重入。
再看第2、3步,記錄poll的開始以及檢查是否有訂閱主題。然后進(jìn)入do-while循環(huán),如果沒有拉取到消息,將在不超時(shí)的情況下一直輪循。
第4步,安全的喚醒消費(fèi)者,并不是喚醒,而是檢查是否有喚醒的風(fēng)險(xiǎn),如果程序在執(zhí)行不可中斷的方法或是收到中斷請求,會拋出異常,這里我還不是很明白,先放一下。
第5步,更新偏移量,就是我們在前文說的在進(jìn)行拉取操作前會先檢查是否可以進(jìn)行偏移量提交。
第6步,pollForFetches方法拉取消息,其實(shí)現(xiàn)如下:
private?Map>>?pollForFetches(Timer?timer)?{????long?pollTimeout?=?coordinator?==?null???timer.remainingMs()?:????Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()),?timer.remainingMs());????//?1.如果消息已經(jīng)有了,則立即返回????final?Map>>?records?=?fetcher.fetchedRecords();????if?(!records.isEmpty())?{????????return?records;????}????//?2.準(zhǔn)備拉取請求????fetcher.sendFetches();????if?(!cachedSubscriptionHashAllFetchPositions?&&?pollTimeout?>?retryBackoffMs)?{????????pollTimeout?=?retryBackoffMs;????}????Timer?pollTimer?=?time.timer(pollTimeout);????//?3.發(fā)送拉取請求????client.poll(pollTimer,?()?->?{????????return?!fetcher.hasAvailableFetches();????});????timer.update(pollTimer.currentTimeMs());????//?3.返回消息????return?fetcher.fetchedRecords();}如果fetcher已經(jīng)有消息了則立即返回,這里和下面將要講的第7步對應(yīng)。如果沒有消息則使用Fetcher準(zhǔn)備拉取請求然后再通過ConsumerNetworkClient發(fā)送請求,最后返回消息。
為啥消息會已經(jīng)有了呢,我們回到poll的第7步,如果拉取到了消息或者有未處理的請求,由于用戶還需要處理未處理的消息,這時(shí)候可以使用異步的方式發(fā)起下一次的拉取消息的請求,將數(shù)據(jù)提前拉取,減少網(wǎng)絡(luò)IO的等待時(shí)間,提高程序的效率。
第8步,調(diào)用消費(fèi)者攔截器處理,就像KafkaProducer中有ProducerInterceptor,在KafkaConsumer中也有ConsumerInterceptor,用于處理返回的消息,處理完后,再返回給用戶。
第9、10步,釋放鎖和記錄poll結(jié)束,對應(yīng)了第1、2步。
對KafkaConsumer的poll方法就分析到這里。最后用一個(gè)思維導(dǎo)圖回顧下文中較為重要的知識點(diǎn):
總結(jié)
以上是生活随笔為你收集整理的kafka consumer配置拉取速度慢_Kafka消费者的使用和原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大泼猴剧情介绍
- 下一篇: jmeter 循环取值赋值给form_J