日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Producer 发送消息源码阅读

發布時間:2025/3/17 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Producer 发送消息源码阅读 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

今天看了kafka 發送消息部分的源碼(0.8.2.1版本的),針對kafka的消息發送,分區策略如下:

1 kafka的分區策略

?1.1 如果指定了partition,則將消息發到對應的partition

?1.2 如果沒有指定partition,但指定了key, 會根據key的hash選擇一個partition, ?

? ?如果如果key名固定,則消息只會發到固定的一個partition上, 所以key不要設置為固定的值,如果需要設置,則需要考慮修改kafka的源碼,以支持將數據均勻發到不同的partition上

1.3 如果key,partition都沒有指定,則采用round-robin即輪循的方式發到每個partition

2 消息的發送都是異步的,發送過程如下

涉及到三個對象:

2.1?RecordAccumulator

維護了一個ConcurrentMap<TopicPartition,?Deque<RecordBatch>>?batches 對象

一個partition對應一個RecordBatch的ArrayDeque? ?

調用KafkaProducer.send方法發送消息,最終調用如下方法:

? ? ? ? ? ?

如果RecordBatch已經滿 或 創建了新的RecordBatch,則喚醒發送對象Sender

???? ? ? ? ? ? ??

2.2?Sender

?The?background?thread?that?handles?the?sending?of?produce?requests?to?the?Kafka?cluster

Sender通過kafkaclient將RecordAccumulator 的數據批量寫入到server? ??

Sender定義的run方法實現如下:

? ?

在run(long now)中,實現邏輯如下:

2.2.1 首先通過如下條件獲取發送數據的節點?

2.2.2刪除掉當前不能發送的kafka node

? ??? ??? ??? ?? ? ? ? ? ?

2.2.3 獲取發送的數據列表

? ? 循環此節點上是leader的partition

? ? ? ? ? 根據partition,獲取此partition對應的RecordBatch,并放到此節點對應的?List<RecordBatch>

? ??? ??? ??? ??? ??? ??? ???? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

2.2.4組裝請求對象,發送到不同的kafka節點

計算pollTimeout并發送請求對象到不同的kafka節點

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);

2.2.5 針對返回的數據進行處理

// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);for (ClientResponse response : responses) {if (response.wasDisconnected())handleDisconnect(response, now);elsehandleResponse(response, now);}

2.3?KafkaClient

其實現類是:NetworkClient,基于socket方式與server進行數據交互

3 kafka參數配置

用于存儲批量數據的緩沖大小(對應類:MemoryRecords)?batch-size :?16384

用于整個client緩存所有發送對象的大小(對應類:BufferPool ) :BUFFER_MEMORY ?32?*?1024?*?1024L 即 32M

用于發送延遲的時間配置(LINGER_MS),如果設置為1秒,則記錄先發送到client緩存中,等待1秒后再發送數據,默認為0 表示立即發送

指定數據壓縮類型:?compression.type ,支持:none,gzip, snappy, lz4, 默認為none

理論上,設置LINGER_MS 會提高消息的吞吐量

轉載于:https://my.oschina.net/cloudcoder/blog/917309

總結

以上是生活随笔為你收集整理的Kafka Producer 发送消息源码阅读的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。