最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了
最近看kafka源碼,著實被它的客戶端緩沖池技術優雅到了。忍不住要寫篇文章贊美一下(哈哈)。
注:本文用到的源碼來自kafka2.2.2版本。
?
背景
當我們應用程序調用kafka客戶端 producer發送消息的時候,在kafka客戶端內部,會把屬于同一個topic分區的消息先匯總起來,形成一個batch。真正發往kafka服務器的消息都是以batch為單位的。如下圖所示:
這么做的好處顯而易見。客戶端和服務端通過網絡通信,這樣批量發送可以減少網絡帶來的性能開銷,提高吞吐量。
這個Batch的管理就非常值得探討了。可能有人會說,這不簡單嗎?用的時候分配一個塊內存,發送完了釋放不就行了嗎。
kafka是用java語言編寫的(新版本大部分都是用java實現的了),用上面的方案就是使用的時候new一個空間然后賦值給一個引用,釋放的時候把引用置為null等JVM GC處理就可以了。
看起來似乎也沒啥問題。但是在并發量比較高的時候就會頻繁的進行GC。我們都知道GC的時候有個stop the world,盡管最新的GC技術這個時間已經非常短,依然有可能成為生產環境的性能瓶頸。
kafka的設計者當然能考慮到這一層。下面我們就來學習下kafka是如何對batch進行管理的。
?
緩沖池技術原理解析
kafka客戶端使用了緩沖池的概念,預先分配好真實的內存塊,放在池子里。
每個batch其實都對應了緩沖池中的一個內存空間,發送完消息之后,batch不再使用了,就把內存塊歸還給緩沖池。
聽起來是不是很耳熟啊?不錯,數據庫連接池,線程池等池化技術其實差不多都是這樣的原理。通過池化技術降低創建和銷毀帶來的開銷,提升執行效率。
代碼是最好的文檔,下面我們就來擼下源碼。
我們擼代碼的步驟采用的是從上往下的原則,先帶你看看緩沖池在哪里使用,然后再深入到緩存池內部深入分析。
下面的代碼做了一些刪減,值保留了跟本文相關的部分便于分析。
public?class?KafkaProducer<K,?V>?implements?Producer<K,?V>?{private?final?Logger?log;private?static?final?AtomicInteger?PRODUCER_CLIENT_ID_SEQUENCE?=?new?AtomicInteger(1);private?static?final?String?JMX_PREFIX?=?"kafka.producer";public?static?final?String?NETWORK_THREAD_PREFIX?=?"kafka-producer-network-thread";public?static?final?String?PRODUCER_METRIC_GROUP_NAME?=?"producer-metrics";@Overridepublic?Future<RecordMetadata>?send(ProducerRecord<K,?V>?record,?Callback?callback)?{//?intercept?the?record,?which?can?be?potentially?modified;?this?method?does?not?throw?exceptionsProducerRecord<K,?V>?interceptedRecord?=?this.interceptors.onSend(record);return?doSend(interceptedRecord,?callback);}private?Future<RecordMetadata>?doSend(ProducerRecord<K,?V>?record,?Callback?callback)?{RecordAccumulator.RecordAppendResult?result?=?accumulator.append(tp,?timestamp,?serializedKey,serializedValue,?headers,?interceptCallback,?remainingWaitMs);...}當我們調用客戶端的發送消息的時候,底層會調用doSend,然后里面使用一個記錄累計器RecordAccumulator把消息append進來。我們繼續往下看看。
public?final?class?RecordAccumulator?{private?final?Logger?log;private?volatile?boolean?closed;private?final?AtomicInteger?flushesInProgress;private?final?AtomicInteger?appendsInProgress;private?final?int?batchSize;private?final?CompressionType?compression;private?final?int?lingerMs;private?final?long?retryBackoffMs;private?final?int?deliveryTimeoutMs;private?final?BufferPool?free;private?final?Time?time;private?final?ApiVersions?apiVersions;private?final?ConcurrentMap<TopicPartition,?Deque<ProducerBatch>>?batches;private?final?IncompleteBatches?incomplete;//?The?following?variables?are?only?accessed?by?the?sender?thread,?so?we?don't?need?to?protect?them.private?final?Map<TopicPartition,?Long>?muted;private?int?drainIndex;private?final?TransactionManager?transactionManager;private?long?nextBatchExpiryTimeMs?=?Long.MAX_VALUE;?//?the?earliest?time?(absolute)?a?batch?will?expire.public?RecordAppendResult?append(TopicPartition?tp,long?timestamp,byte[]?key,byte[]?value,Header[]?headers,Callback?callback,long?maxTimeToBlock)?throws?InterruptedException?{//?We?keep?track?of?the?number?of?appending?thread?to?make?sure?we?do?not?miss?batches?in//?abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer?buffer?=?null;buffer?=?free.allocate(size,?maxTimeToBlock);synchronized?(dq)?{//?Need?to?check?if?producer?is?closed?again?after?grabbing?the?dequeue?lock.if?(closed)throw?new?KafkaException("Producer?closed?while?send?in?progress");RecordAppendResult?appendResult?=?tryAppend(timestamp,?key,?value,?headers,?callback,?dq);if?(appendResult?!=?null)?{//?Somebody?else?found?us?a?batch,?return?the?one?we?waited?for!?Hopefully?this?doesn't?happen?often...return?appendResult;}MemoryRecordsBuilder?recordsBuilder?=?recordsBuilder(buffer,?maxUsableMagic);ProducerBatch?batch?=?new?ProducerBatch(tp,?recordsBuilder,?time.milliseconds());FutureRecordMetadata?future?=?Utils.notNull(batch.tryAppend(timestamp,?key,?value,?headers,?callback,?time.milliseconds()));dq.addLast(batch);...RecordAccumulator其實就是管理一個batch隊列,我們看到append方法實現其實是調用BufferPool的free方法申請(allocate)了一塊內存空間(ByteBuffer), 然后把這個內存空空間包裝成batch添加到隊列后面。
當消息發送完成不在使用batch的時候,RecordAccumulator會調用deallocate方法歸還內存,內部其實是調用BufferPool的deallocate方法。
public?void?deallocate(ProducerBatch?batch)?{incomplete.remove(batch);//?Only?deallocate?the?batch?if?it?is?not?a?split?batch?because?split?batch?are?allocated?outside?the//?buffer?pool.if?(!batch.isSplitBatch())free.deallocate(batch.buffer(),?batch.initialCapacity());}很明顯,BufferPool就是緩沖池管理的類,也是我們今天要討論的重點。我們先來看看分配內存塊的方法。
public?class?BufferPool?{static?final?String?WAIT_TIME_SENSOR_NAME?=?"bufferpool-wait-time";private?final?long?totalMemory;private?final?int?poolableSize;private?final?ReentrantLock?lock;private?final?Deque<ByteBuffer>?free;private?final?Deque<Condition>?waiters;/**?Total?available?memory?is?the?sum?of?nonPooledAvailableMemory?and?the?number?of?byte?buffers?in?free?*?poolableSize.??*/private?long?nonPooledAvailableMemory;private?final?Metrics?metrics;private?final?Time?time;private?final?Sensor?waitTime;public?ByteBuffer?allocate(int?size,?long?maxTimeToBlockMs)?throws?InterruptedException?{if?(size?>?this.totalMemory)throw?new?IllegalArgumentException("Attempt?to?allocate?"?+?size+?"?bytes,?but?there?is?a?hard?limit?of?"+?this.totalMemory+?"?on?memory?allocations.");ByteBuffer?buffer?=?null;this.lock.lock();try?{//?check?if?we?have?a?free?buffer?of?the?right?size?pooledif?(size?==?poolableSize?&&?!this.free.isEmpty())return?this.free.pollFirst();//?now?check?if?the?request?is?immediately?satisfiable?with?the//?memory?on?hand?or?if?we?need?to?blockint?freeListSize?=?freeSize()?*?this.poolableSize;if?(this.nonPooledAvailableMemory?+?freeListSize?>=?size)?{//?we?have?enough?unallocated?or?pooled?memory?to?immediately//?satisfy?the?request,?but?need?to?allocate?the?bufferfreeUp(size);this.nonPooledAvailableMemory?-=?size;}?else?{//?we?are?out?of?memory?and?will?have?to?blockint?accumulated?=?0;Condition?moreMemory?=?this.lock.newCondition();try?{long?remainingTimeToBlockNs?=?TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);//?loop?over?and?over?until?we?have?a?buffer?or?have?reserved//?enough?memory?to?allocate?onewhile?(accumulated?<?size)?{long?startWaitNs?=?time.nanoseconds();long?timeNs;boolean?waitingTimeElapsed;try?{waitingTimeElapsed?=?!moreMemory.await(remainingTimeToBlockNs,?TimeUnit.NANOSECONDS);}?finally?{long?endWaitNs?=?time.nanoseconds();timeNs?=?Math.max(0L,?endWaitNs?-?startWaitNs);recordWaitTime(timeNs);}if?(waitingTimeElapsed)?{throw?new?TimeoutException("Failed?to?allocate?memory?within?the?configured?max?blocking?time?"?+?maxTimeToBlockMs?+?"?ms.");}remainingTimeToBlockNs?-=?timeNs;//?check?if?we?can?satisfy?this?request?from?the?free?list,//?otherwise?allocate?memoryif?(accumulated?==?0?&&?size?==?this.poolableSize?&&?!this.free.isEmpty())?{//?just?grab?a?buffer?from?the?free?listbuffer?=?this.free.pollFirst();accumulated?=?size;}?else?{//?we'll?need?to?allocate?memory,?but?we?may?only?get//?part?of?what?we?need?on?this?iterationfreeUp(size?-?accumulated);int?got?=?(int)?Math.min(size?-?accumulated,?this.nonPooledAvailableMemory);this.nonPooledAvailableMemory?-=?got;accumulated?+=?got;}...首先整個方法是加鎖操作的,所以支持并發分配內存。
邏輯是這樣的,當申請的內存大小等于poolableSize,則從緩存池中獲取。這個poolableSize可以理解成是緩沖池的頁大小,作為緩沖池分配的基本單位。從緩存池獲取其實就是從ByteBuffer隊列取出一個元素返回。
如果申請的內存不等于特定的數值,則向非緩存池申請。同時會從緩沖池中取一些內存并入到非緩沖池中。這個nonPooledAvailableMemory指的就是非緩沖池的可用內存大小。非緩沖池分配內存,其實就是調用ByteBuffer.allocat分配真實的JVM內存。
緩存池的內存一般都很少回收。而非緩存池的內存是使用后丟棄,然后等待GC回收。
繼續來看看batch釋放的代碼,
public?void?deallocate(ByteBuffer?buffer,?int?size)?{lock.lock();try?{if?(size?==?this.poolableSize?&&?size?==?buffer.capacity())?{buffer.clear();this.free.add(buffer);}?else?{this.nonPooledAvailableMemory?+=?size;}Condition?moreMem?=?this.waiters.peekFirst();if?(moreMem?!=?null)moreMem.signal();}?finally?{lock.unlock();}}很簡單,也是分為兩種情況。要么直接歸還緩沖池,要么就是更新非緩沖池部分的可以內存。然后通知等待隊列里的第一個元素。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的最近看Kafka源码,着实被它的客户端缓冲池技术优雅到了的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 跟Kafka学技术系列之时间轮
- 下一篇: Kafka如何通过精妙的架构设计优化JV