深入分析Kafka生产者和消费者
深入Kafka生產者和消費者
- Kafka生產者
- 消息發送的流程
- 發送方式
- 發送并忘記
- 同步發送
- 異步發送
- 生產者屬性配置
- 序列化器
- 分區器
- 自定義分區器
- Kafka消費者
- 消費者屬性配置
- 消費者基礎概念
- 消費者群組
- 訂閱主題
- 輪詢拉取
- 提交和偏移量
- 提交偏移量帶來的問題
- 自動提交
- 手動提交
- 異步提交
- 同步和異步提交
- 特定提交
- 消費者核心概念
- 群組協調
- 分區再均衡
- 再均衡監聽器
- 從特定偏移量處開始記錄
- 優雅退出
- 反序列化器
- 獨立消費者
Kafka生產者
消息發送的流程
生產者每發送一條消息都需要先創建一個ProducerRecord對象,并且需要指定目標主題、消息內容,當然還可以指定消息鍵和分區。之后就會調用send()方法發送該對象,由于生產者需要與Kafka Broker建立網絡傳輸,必然需要先通過序列化器對消息的鍵和值對象序列化成字節數組,才能進行傳輸。
之后,分區器就會接收到數據,然后先確認ProducerRecord對象中是否指定了分區,如果指定分區那么就直接把指定的分區返回,如果未指定分區,分區器就會根據消息鍵進行選擇分區。確認分區后,那么消息才能確認發送到哪個主題的哪個分區上。接下來,消息又會被添加到批次里,同一個批次的消息總是發送到同一個主題和分區上,最后生產者端會有一個獨立線程負責將批次發送到相應的Broker上。
Broker接收到消息后會進行響應,消息寫入成功,會返回生產者端一個RecordMetaData對象,這個對象記錄了消息在哪個主題的分區上,同時還記錄了消息在分區中的偏移量。消息如果寫入失敗,則會返回一個錯誤,而生產者會根據配置的重試次數進行重試,當超過重試次數還是失敗,就會將錯誤信息返回給生產者端。
發送方式
發送并忘記
producer.send(record);//忽略返回值同步發送
//接收返回值 Future<RecordMetadata> future = producer.send(record); //調用get方法進行阻塞,獲取結果 RecordMetadata recordMetadata = future.get();異步發送
//發送時,指定Callback回調 producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata,Exception exception) {if(null!=exception){//異常處理}if(null!=metadata){System.out.println("message offset:"+metadata.offset()+" "+"message partition:"+metadata.partition());}} );生產者屬性配置
創建KafkaProducer時都需要為其指定屬性,屬性的配置可以參考org.apache.kafka.clients.producer 包下的 ProducerConfig 類,大部分屬性都配置了合理的默認值,如果對內存使用、性能和可靠性方面有要求可以相應調整一些屬性,下面介紹一些常用的配置屬性:
acks=0:生產者在成功寫入消息之前是不會等待任何的來自服務器的響應。如果在此期間出現了異常,造成Broker沒能收到消息,而此時生產者又得不到反饋,消息也就丟失了。但是因為生產者不需要去等待服務器的響應,吞吐量相對更高;
acks=1:只要集群中分區的首領節點接收到消息,生產者就會收到來自服務器的成功響應。如果消息無法到達首領節點,生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。缺省使用這個配置;
acks=all:只有當集群中所有的分區副本都接收到消息后,生產者才會受到一個來自服務器的成功響應。
序列化器
創建KafkaProducer對象時,必須指定鍵和值的序列化器,一些業務場景可能需要自定義序列化器,那么只需要實現org.apache.kafka.common.serialization.Serializer 接口,重寫serialize()方法定義序列化邏輯即可。但自定義序列化器可能更多會去結合特定業務場景使用,所以容易導致程序的脆弱性,如果需求做了調整相應的序列化器實現也可能需要調整。因此使用序列化器更推薦使用自帶格式描述以及語言無關的序列化框架,比如Kafka 官方推薦的 Apache Avro。
Avro在文件的讀寫是依據schema而進行的,而schema是通過一個JSON文件進行描述數據的,可以把這個schema 內嵌在數據文件中。這樣,不管數據格式如何變動,消費者都知道如何處理數據。但是內嵌的消息,自帶格式,會導致消息的大小不必要的增大,消耗了資源。我們可以使用 schema 注冊表機制,將所有寫入的數據用到的 schema 保存在注冊表中,然后在消息中引用 schema 的標識符,而讀取的數據的消費者程序使用這個標識符從注冊表中拉取 schema 來反序列化記錄。
分區器
生產者在發送消息時需要創建ProducerRecord對象,ProducerRecord對象可以指定一個消息鍵。指定了消息鍵,那么分區器就會將擁有相同鍵的消息指定給同一個主題的同一個分區。如果沒有指定消息鍵,那么會通過默認分區器,使用輪詢算法將消息均衡發布到主題下的各個分區。默認分區器會對消息鍵進行散列,然后根據散列值將消息映射到特定的分區上,這樣同一個消息鍵總是能夠被映射到同一個分區,但是只有不改變主題分區數量的情況下,鍵和分區之間的映射才能保持不變,一旦增加了新的分區,就無法保證了,所以如果要使用鍵來映射分區,那就要在創建主題的時候把分區規劃好,不要增加新分區。
自定義分區器
一些業務場景中數據可能會有側重,比如按地區進行劃分數據時,不同地區的消息量是不同的,那么這種情況下就可以根據消息值中的一些標識,去針對消息值進行做分區,會更適合對應的業務場景。自定義一個分區器只需要去實現org.apache.kafka.clients.producer.Partitioner該接口,重寫partition()方法完成相應的分區邏輯。
Kafka消費者
消費者屬性配置
消費者需要創建KafkaConsumer,創建該對象時也需要指定消費者相關屬性,可以參考org.apache.kafka.clients.consumer 包下 ConsumerConfig 類,大部分屬性都配置了合理的默認值,如果需要關注內存使用、性能和可靠性方面可以相應調整一些屬性,下面介紹一些常用的配置屬性:
消費者基礎概念
消費者群組
在一些高并發的情況下,當Kafka生產者發送消息的速度遠快于消費者消費速度時,如果只配置單個消費者,容易造成消息堆積,消息不能及時處理。這種情況下通常考慮的就是對消費者進行橫向伸縮,通過增加消費者個數對同一個主題多個分區的消息進行分流。而Kafka中多個消費者通常會構成一個消費者群組,往群組中增加消費者是進行橫向伸縮的主要方式。
在一個消費者群組中所有消費者都是訂閱同一個主題,主題下一個分區只能由一個消費者消費,而一個消費者可以消費多個分區。
訂閱主題
//消費者訂閱主題(可以多個),主題值允許使用正則表達式 consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));消費端創建KafkaConsumer對象后,會使用subscribe()方法進行訂閱主題,而一個消費者是可以訂閱多個主題的,該方法可以傳遞一個主題列表或者正則表達式作為參數。正則表達式也能夠匹配多個主題,比如,想訂閱所有order相關的主題,可以使用subscribe(“order.*”) 。
需要注意: 在通過正則表達式訂閱主題時,如果新建的一個主題正好與表達式匹配,那么會立即觸發一次再均衡,消費者就可以讀取新添加的主題了。
輪詢拉取
//輪詢獲取消息 while(true){//拉取ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分區:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));} }Kafka消費端是通過拉取的方式獲取消息,消費者為了不斷獲取消息,只能在循環中不斷調用poll()方法進行拉取。其中poll()方法需要指定超時時間,它會讓消費者在指定的毫秒數內一直等待 broker 返回數據。poll()方法會返回一個ConsumerRecords列表對象,而其中每一個ConsumerRecord對象都包含了消息所屬的主題信息、所在分區信息、在分區里的偏移量,以及鍵值對。
提交和偏移量
消費者可以使用 Kafka來追蹤消息在分區里的位置,稱之為偏移量。消費者更新自己讀取到哪個消息的操作,稱之為提交。消費者提交偏移量本質上就是向一個_consumer_offset 的特殊主題發送一個消息,里面會包括每個分區的偏移量。
提交偏移量帶來的問題
如果提交的偏移量小于消費者實際處理的最后一個消息的偏移量,處于兩個偏移量之間的消息會被重復處理。
如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。
自動提交
auto.commit. offset缺省情況下為true,消費者會自動提交偏移量,自動提交存在一個時間間隔由auto.commit.interval.ms進行控制,缺省為5s。自動提交是在輪詢拉取過程中觸發的,消費者每次輪詢時都會檢查是否提交偏移量,如果是,則會將poll()方法返回的最新偏移量進行提交。
注意:自動提交由于是基于時間間隔的提交,如果在未達到提交時間時觸發了分區再均衡,就容易造成在此之前一部分已經處理的消息被其它消費者重復處理了。并且自動提交總是將poll()方法返回的最新偏移量進行提交,它并不知道哪些消息處理成功了,所以再次調用之前最好確保所有當前調用poll()方法返回的消息都處理完成,否則可能造成消息丟失。
手動提交
將auto.commit. offset設置為false,然后調用commitsync()方法提交偏移量。這個方法會提交調用poll()方法返回的最新偏移量,只要沒有發生不可恢復的錯誤,該方法會一直阻塞,直到提交成功后返回,如果提交失敗就會拋出異常。
注意:手動提交由于也是提交poll()方法返回的最新偏移量,所以在處理完所有的消息后要確保調用了commitsync()方法,否則可能造成消息丟失。
異步提交
調用commitAsync()方法進行異步提交,相比與手動提交,它不會使應用程序阻塞,無需等待Broker響應。并且它支持回調,能夠在Broker響應時執行相應回調方法。
//異步提交偏移量 consumer.commitAsync(); //支持回調 consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if(exception!=null){System.out.print("Commmit failed for offsets "+ offsets);}} });同步和異步提交
一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。這個時候就需要使用同步異步組合提交。
try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic= %s,partition= %d,offset= %d,key= %s,value= %s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}//每次輪詢進行異步提交consumer.commitAsync();} } finally {try {//同步提交下consumer.commitSync();} finally {consumer.close();} }特定提交
支持在批次中間進行提交偏移量,在調用 commitsync()和 commitAsync()方法時傳遞希望提交的分區和偏移量構成的一個Map參數。
Map<TopicPartition, OffsetAndMetadata> currOffsets= new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",record.topic(),record.partition(),record.offset(),record.key(),record.value()));currOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"null")); if(count%10==0){//特定提交,指定一個記錄希望提交的分區和偏移量的mapconsumer.commitAsync(currOffsets,null);}count++;} } } finally {try {//同步提交consumer.commitSync();} finally {consumer.close();} }消費者核心概念
群組協調
消費者要加入群組時,會向群組協調器發送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員列表,并負責給每一個消費者分配分區。分配完畢后,群主把分配情況發送給群組協調器,協調器再把這些信息發送給所有的消費者,每個消費者只能看到自己的分配信息, 只有群主知道群組里所有消費者的分配信息。群組協調的工作會在消費者發生變化,主題中分區發生了變化時發生。
分區再均衡
在Kafka中,消費者群組中存在著消費者對分區的所有權關系,這樣在一個群組中如果新增一個消費者,那么新的消費者會分配到原先由其他消費者讀取的分區,而減少一個消費者,那原本由它負責的分區就會分配給其它消費者。除此之外,如果增加了分區,新增的分區也需劃分由哪個消費者讀取,這一系列的行為,都會導致分區所有權的變化,這種變化就稱為分區再均衡。
在消費者群組中我們介紹了它有一個群組協調器,而群組協調器它會接收群組中每個消費者發來的心跳,然后維持每個消費者和群組的從屬關系以及對分區所有權關系。如果長時間未收到消費者發送的心跳,群組協調器就會認為當前消費者已經死亡,就會觸發一次再均衡。
分區再均衡在Kafka中是非常重要的,這是消費者群組帶來高可用性和伸縮性的關鍵所在。但是發生分區再均衡的期間,消費者會無法接收到消息,會造成整個群組一段時間的不可用,因此都需要盡量減少發生分區再均衡。
再均衡監聽器
消費者調用subscribe()訂閱主題時,指定一個ConsumerRebalanceListener,在再均衡開始之前和分區再均衡完成之后做一些操作。
//指定一個ConsumerRebalancelistener consumer.subscribe(Collections.singletonList("test1"), new ConsumerRebalanceListener() {//分區再均衡之前@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {//1、將偏移量提交到Kafka//2、偏移量寫入數據庫}//分區再均衡完成以后@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//1、從數據庫中獲取偏移量//2、通過seek()方法從指定偏移量位置開始讀取} });從特定偏移量處開始記錄
通常情況下消費者沒有通過seek()方法指定讀取位置時,調用poll()方法默認都會從分區的最新偏移量處開始讀取消息。當然如果想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可以使 seekToBeginning(Collection tp)和 seekToEnd( Collectiontp)這兩個方法。而調用seek()是可以從從特定的偏移量處開始讀取消息的。
//從指定分區中的指定偏移量開始消費 consumer.seek(topicPartition,2);優雅退出
如果確定要退出循環,需要通過另一個線程調用 consumer. wakeup()方法。如果循環運行在主線程里,可以在 ShutdownHook 里調用該方法。要記住, consumer. wakeup()是消費者唯一一個可以從其他線程里安全調用的方法。
反序列化器
創建KafkaConsumer對象時需要指定反序列化器,將從Kafka接收到的字節數組轉換成 java對象,發送消息指定的序列化器必須與接收消息使用的反序列化器一一對應的。一些業務場景可能需要自定義反序列化器,那么只需要實現org.apache.kafka.common.serialization.Deserializer接口,重寫deserialize()方法定義反序列化邏輯即可。
獨立消費者
一個消費者從一個主題的所有分區或者某個特定的分區讀取數據,不需要消費者群組和再均衡,只需要把主題或者分區分配給消費者,然后開始讀取消息并提交偏移量。
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //獨立消費者(不需要訂閱主題,只需要分配主題中分區即可) KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(properties); //拿到主題的分區信息 List<PartitionInfo> partitionInfos = consumer.partitionsFor("independ-consumer"); List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>(); if(null!=partitionInfos){for(PartitionInfo partitionInfo:partitionInfos){topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));} } //獨立消費者需要執行哪些分區(這里全部的分區分配給一個消費者) consumer.assign(topicPartitionList); try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("主題:%s,分區:%d,偏移量:%d,key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}} } finally {consumer.close(); }總結
以上是生活随笔為你收集整理的深入分析Kafka生产者和消费者的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安恒5月赛BJDCTF3th-逆向
- 下一篇: 谁创造了硅谷?仙童半导体“叛逆八人”