日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

深入分析Kafka生产者和消费者

發布時間:2024/3/12 编程问答 50 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深入分析Kafka生产者和消费者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

深入Kafka生產者和消費者

  • Kafka生產者
    • 消息發送的流程
    • 發送方式
      • 發送并忘記
      • 同步發送
      • 異步發送
    • 生產者屬性配置
    • 序列化器
    • 分區器
      • 自定義分區器
  • Kafka消費者
    • 消費者屬性配置
    • 消費者基礎概念
      • 消費者群組
      • 訂閱主題
      • 輪詢拉取
      • 提交和偏移量
        • 提交偏移量帶來的問題
        • 自動提交
        • 手動提交
        • 異步提交
        • 同步和異步提交
        • 特定提交
    • 消費者核心概念
      • 群組協調
      • 分區再均衡
        • 再均衡監聽器
      • 從特定偏移量處開始記錄
      • 優雅退出
      • 反序列化器
      • 獨立消費者

Kafka生產者

消息發送的流程


生產者每發送一條消息都需要先創建一個ProducerRecord對象,并且需要指定目標主題、消息內容,當然還可以指定消息鍵和分區。之后就會調用send()方法發送該對象,由于生產者需要與Kafka Broker建立網絡傳輸,必然需要先通過序列化器對消息的鍵和值對象序列化成字節數組,才能進行傳輸。
之后,分區器就會接收到數據,然后先確認ProducerRecord對象中是否指定了分區,如果指定分區那么就直接把指定的分區返回,如果未指定分區,分區器就會根據消息鍵進行選擇分區。確認分區后,那么消息才能確認發送到哪個主題的哪個分區上。接下來,消息又會被添加到批次里,同一個批次的消息總是發送到同一個主題和分區上,最后生產者端會有一個獨立線程負責將批次發送到相應的Broker上。
Broker接收到消息后會進行響應,消息寫入成功,會返回生產者端一個RecordMetaData對象,這個對象記錄了消息在哪個主題的分區上,同時還記錄了消息在分區中的偏移量。消息如果寫入失敗,則會返回一個錯誤,而生產者會根據配置的重試次數進行重試,當超過重試次數還是失敗,就會將錯誤信息返回給生產者端。

發送方式

發送并忘記

producer.send(record);//忽略返回值

同步發送

//接收返回值 Future<RecordMetadata> future = producer.send(record); //調用get方法進行阻塞,獲取結果 RecordMetadata recordMetadata = future.get();

異步發送

//發送時,指定Callback回調 producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata,Exception exception) {if(null!=exception){//異常處理}if(null!=metadata){System.out.println("message offset:"+metadata.offset()+" "+"message partition:"+metadata.partition());}} );

生產者屬性配置

創建KafkaProducer時都需要為其指定屬性,屬性的配置可以參考org.apache.kafka.clients.producer 包下的 ProducerConfig 類,大部分屬性都配置了合理的默認值,如果對內存使用、性能和可靠性方面有要求可以相應調整一些屬性,下面介紹一些常用的配置屬性:

  • acks: 確認機制,控制生產者發送消息時,必須有指定多少個分區副本接收到信息后,生產者才能認為消息寫入成功,可選配置如下:
    acks=0:生產者在成功寫入消息之前是不會等待任何的來自服務器的響應。如果在此期間出現了異常,造成Broker沒能收到消息,而此時生產者又得不到反饋,消息也就丟失了。但是因為生產者不需要去等待服務器的響應,吞吐量相對更高;
    acks=1:只要集群中分區的首領節點接收到消息,生產者就會收到來自服務器的成功響應。如果消息無法到達首領節點,生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發消息。不過,如果一個沒有收到消息的節點成為新首領,消息還是會丟失。缺省使用這個配置;
    acks=all:只有當集群中所有的分區副本都接收到消息后,生產者才會受到一個來自服務器的成功響應。
  • batch.size: 對于發往同一個分區的消息,生產者發送過程中會先將消息記錄在一個批次中。該參數的作用就是控制一個批次占用的內存大小。一個批次內存被填滿后,會一次性將批次里的所有消息發送給Broker,但生產者并一定會等到批次被填滿后才進行發送,即使是一條消息也有可能被發送。該參數的大小是按照字節數計算的,缺省為16384(16k),批次內存滿了新的消息就寫不進去了。
  • linger.ms: 該參數用于配合batch.size使用,作用是控制生產者在發送批次前等待更多消息加入批次的時間。當然如果batch.size指定的批次內存已經填滿,就不會進行等待而是直接發送,反之發送的消息字節數遠比batch.size小,那么就能夠在linger.ms指定的時間內獲得更多的消息,從而減少請求次數,提升消息的吞吐量。
  • max.request.size: 該參數用來控制生產者發送請求最大大小,缺省為1M。如果請求只有一條消息,則約束消息大小不能超過1M,如果請求是一個批次,則約束批次中所有消息的總大小不能超過1M。需要注意這個參數與Kafka的server.properties配置文件中指定的message.max.bytes參數有關,如果生產者發送的消息超過 message.max.bytes 設置的大小,就會被 Kafka Broker拒絕。
  • buffer.memory: 控制生產者內存緩沖區的大小。
  • retries: 控制生產者在消息發送失敗后,可以進行重試的次數,默認情況下每次重試過程都會等待100ms再進行重試,重試等待時間可以通過retry.backoff.ms 參數來調整。
  • request.timeout.ms: 控制生產者發送消息后等待請求響應的最大時間,超過這個時間沒有收到響應,那么生產者端就會重試,超過重試次數將會拋出異常,缺省為30s。
  • max.in.flight.requests.per.connection: 控制生產者在接收到服務器響應之前可以發送多少個消息,如果需要保證消息在一個分區上的嚴格順序,這個值應該設為 1,不過這樣會嚴重影響生產者的吞吐量。
  • compression.type: 該參數控制生產者進行壓縮數據的壓縮類型,可選值包括:[none,gzip,snappy],缺省是none。壓縮數據適用于消息批次處理,處理的批次消息越多,壓縮性能就越好。snappy 占用 cpu 少,提供較好的性能和可觀的壓縮比,更關注性能和網絡帶寬建議使用這個。而如果網絡帶寬緊張,可以用gzip,雖然會占用較多的 cpu,但提供更高的壓縮比。
  • 序列化器

    創建KafkaProducer對象時,必須指定鍵和值的序列化器,一些業務場景可能需要自定義序列化器,那么只需要實現org.apache.kafka.common.serialization.Serializer 接口,重寫serialize()方法定義序列化邏輯即可。但自定義序列化器可能更多會去結合特定業務場景使用,所以容易導致程序的脆弱性,如果需求做了調整相應的序列化器實現也可能需要調整。因此使用序列化器更推薦使用自帶格式描述以及語言無關的序列化框架,比如Kafka 官方推薦的 Apache Avro。
    Avro在文件的讀寫是依據schema而進行的,而schema是通過一個JSON文件進行描述數據的,可以把這個schema 內嵌在數據文件中。這樣,不管數據格式如何變動,消費者都知道如何處理數據。但是內嵌的消息,自帶格式,會導致消息的大小不必要的增大,消耗了資源。我們可以使用 schema 注冊表機制,將所有寫入的數據用到的 schema 保存在注冊表中,然后在消息中引用 schema 的標識符,而讀取的數據的消費者程序使用這個標識符從注冊表中拉取 schema 來反序列化記錄。

    分區器

    生產者在發送消息時需要創建ProducerRecord對象,ProducerRecord對象可以指定一個消息鍵。指定了消息鍵,那么分區器就會將擁有相同鍵的消息指定給同一個主題的同一個分區。如果沒有指定消息鍵,那么會通過默認分區器,使用輪詢算法將消息均衡發布到主題下的各個分區。默認分區器會對消息鍵進行散列,然后根據散列值將消息映射到特定的分區上,這樣同一個消息鍵總是能夠被映射到同一個分區,但是只有不改變主題分區數量的情況下,鍵和分區之間的映射才能保持不變,一旦增加了新的分區,就無法保證了,所以如果要使用鍵來映射分區,那就要在創建主題的時候把分區規劃好,不要增加新分區。

    自定義分區器

    一些業務場景中數據可能會有側重,比如按地區進行劃分數據時,不同地區的消息量是不同的,那么這種情況下就可以根據消息值中的一些標識,去針對消息值進行做分區,會更適合對應的業務場景。自定義一個分區器只需要去實現org.apache.kafka.clients.producer.Partitioner該接口,重寫partition()方法完成相應的分區邏輯。

    Kafka消費者

    消費者屬性配置

    消費者需要創建KafkaConsumer,創建該對象時也需要指定消費者相關屬性,可以參考org.apache.kafka.clients.consumer 包下 ConsumerConfig 類,大部分屬性都配置了合理的默認值,如果需要關注內存使用、性能和可靠性方面可以相應調整一些屬性,下面介紹一些常用的配置屬性:

  • auto.offset.reset: 控制消費者讀取一個沒有偏移量的分區或者偏移量無效的情況下,消費者應該如何處理。可選值包括:[latest、earliest],缺省為latest,表示從最新的記錄開始讀取。而earliest則表示消費者從起始位置開始讀取分區的記錄。
  • enable .auto.commit: 表示消費者是否自動提交偏移量,缺省為true。這個參數很關鍵,通常情況都需要設置為false,自行控制何時提交偏移量,這樣可以盡量避免消息重復處理和消息丟失。
  • partition.assignment.strategy: 指定分區分配給消費者的策略。可選值包括:[Range、RoundRobin],缺省為Range,表示當分區數量無法被消費者數整除時,會把主題下的連續分區分配給消費者,第一個消費者通常會分到更多的分區。而RoundRobin則表示會把主題下的分區輪詢分配給消費者。
  • max.poll.records: 控制執行poll() 方法返回的記錄數量。
  • max.partition.fetch.bytes:指定了服務器從每個分區里返回給消費者的最大字節數,缺省為1MB。需要注意,這個參數要比服務器的 message.max.bytes 更大,否則消費者可能無法讀取消息。
  • 消費者基礎概念

    消費者群組

    在一些高并發的情況下,當Kafka生產者發送消息的速度遠快于消費者消費速度時,如果只配置單個消費者,容易造成消息堆積,消息不能及時處理。這種情況下通常考慮的就是對消費者進行橫向伸縮,通過增加消費者個數對同一個主題多個分區的消息進行分流。而Kafka中多個消費者通常會構成一個消費者群組,往群組中增加消費者是進行橫向伸縮的主要方式。

    在一個消費者群組中所有消費者都是訂閱同一個主題,主題下一個分區只能由一個消費者消費,而一個消費者可以消費多個分區。

    訂閱主題

    //消費者訂閱主題(可以多個),主題值允許使用正則表達式 consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));

    消費端創建KafkaConsumer對象后,會使用subscribe()方法進行訂閱主題,而一個消費者是可以訂閱多個主題的,該方法可以傳遞一個主題列表或者正則表達式作為參數。正則表達式也能夠匹配多個主題,比如,想訂閱所有order相關的主題,可以使用subscribe(“order.*”) 。
    需要注意: 在通過正則表達式訂閱主題時,如果新建的一個主題正好與表達式匹配,那么會立即觸發一次再均衡,消費者就可以讀取新添加的主題了。

    輪詢拉取

    //輪詢獲取消息 while(true){//拉取ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic:%s,分區:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));} }

    Kafka消費端是通過拉取的方式獲取消息,消費者為了不斷獲取消息,只能在循環中不斷調用poll()方法進行拉取。其中poll()方法需要指定超時時間,它會讓消費者在指定的毫秒數內一直等待 broker 返回數據。poll()方法會返回一個ConsumerRecords列表對象,而其中每一個ConsumerRecord對象都包含了消息所屬的主題信息、所在分區信息、在分區里的偏移量,以及鍵值對。

    提交和偏移量

    消費者可以使用 Kafka來追蹤消息在分區里的位置,稱之為偏移量。消費者更新自己讀取到哪個消息的操作,稱之為提交。消費者提交偏移量本質上就是向一個_consumer_offset 的特殊主題發送一個消息,里面會包括每個分區的偏移量。

    提交偏移量帶來的問題

    如果提交的偏移量小于消費者實際處理的最后一個消息的偏移量,處于兩個偏移量之間的消息會被重復處理。

    如果提交的偏移量大于客戶端處理的最后一個消息的偏移量,那么處于兩個偏移量之間的消息將會丟失。

    自動提交

    auto.commit. offset缺省情況下為true,消費者會自動提交偏移量,自動提交存在一個時間間隔由auto.commit.interval.ms進行控制,缺省為5s。自動提交是在輪詢拉取過程中觸發的,消費者每次輪詢時都會檢查是否提交偏移量,如果是,則會將poll()方法返回的最新偏移量進行提交。
    注意:自動提交由于是基于時間間隔的提交,如果在未達到提交時間時觸發了分區再均衡,就容易造成在此之前一部分已經處理的消息被其它消費者重復處理了。并且自動提交總是將poll()方法返回的最新偏移量進行提交,它并不知道哪些消息處理成功了,所以再次調用之前最好確保所有當前調用poll()方法返回的消息都處理完成,否則可能造成消息丟失。

    手動提交

    將auto.commit. offset設置為false,然后調用commitsync()方法提交偏移量。這個方法會提交調用poll()方法返回的最新偏移量,只要沒有發生不可恢復的錯誤,該方法會一直阻塞,直到提交成功后返回,如果提交失敗就會拋出異常。
    注意:手動提交由于也是提交poll()方法返回的最新偏移量,所以在處理完所有的消息后要確保調用了commitsync()方法,否則可能造成消息丟失。

    異步提交

    調用commitAsync()方法進行異步提交,相比與手動提交,它不會使應用程序阻塞,無需等待Broker響應。并且它支持回調,能夠在Broker響應時執行相應回調方法。

    //異步提交偏移量 consumer.commitAsync(); //支持回調 consumer.commitAsync(new OffsetCommitCallback() {public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if(exception!=null){System.out.print("Commmit failed for offsets "+ offsets);}} });

    同步和異步提交

    一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。這個時候就需要使用同步異步組合提交。

    try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic= %s,partition= %d,offset= %d,key= %s,value= %s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}//每次輪詢進行異步提交consumer.commitAsync();} } finally {try {//同步提交下consumer.commitSync();} finally {consumer.close();} }

    特定提交

    支持在批次中間進行提交偏移量,在調用 commitsync()和 commitAsync()方法時傳遞希望提交的分區和偏移量構成的一個Map參數。

    Map<TopicPartition, OffsetAndMetadata> currOffsets= new HashMap<TopicPartition, OffsetAndMetadata>(); int count = 0; try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(500));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("topic=%s, partition=%d, offset=%d, key=%s, value=%s\n",record.topic(),record.partition(),record.offset(),record.key(),record.value()));currOffsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()+1,"null")); if(count%10==0){//特定提交,指定一個記錄希望提交的分區和偏移量的mapconsumer.commitAsync(currOffsets,null);}count++;} } } finally {try {//同步提交consumer.commitSync();} finally {consumer.close();} }

    消費者核心概念

    群組協調


    消費者要加入群組時,會向群組協調器發送一個 JoinGroup 請求,第一個加入群主的消費者成為群主,群主會獲得群組的成員列表,并負責給每一個消費者分配分區。分配完畢后,群主把分配情況發送給群組協調器,協調器再把這些信息發送給所有的消費者,每個消費者只能看到自己的分配信息, 只有群主知道群組里所有消費者的分配信息。群組協調的工作會在消費者發生變化,主題中分區發生了變化時發生。

    分區再均衡

    在Kafka中,消費者群組中存在著消費者對分區的所有權關系,這樣在一個群組中如果新增一個消費者,那么新的消費者會分配到原先由其他消費者讀取的分區,而減少一個消費者,那原本由它負責的分區就會分配給其它消費者。除此之外,如果增加了分區,新增的分區也需劃分由哪個消費者讀取,這一系列的行為,都會導致分區所有權的變化,這種變化就稱為分區再均衡。
    在消費者群組中我們介紹了它有一個群組協調器,而群組協調器它會接收群組中每個消費者發來的心跳,然后維持每個消費者和群組的從屬關系以及對分區所有權關系。如果長時間未收到消費者發送的心跳,群組協調器就會認為當前消費者已經死亡,就會觸發一次再均衡。
    分區再均衡在Kafka中是非常重要的,這是消費者群組帶來高可用性和伸縮性的關鍵所在。但是發生分區再均衡的期間,消費者會無法接收到消息,會造成整個群組一段時間的不可用,因此都需要盡量減少發生分區再均衡。

    再均衡監聽器

    消費者調用subscribe()訂閱主題時,指定一個ConsumerRebalanceListener,在再均衡開始之前和分區再均衡完成之后做一些操作。

    //指定一個ConsumerRebalancelistener consumer.subscribe(Collections.singletonList("test1"), new ConsumerRebalanceListener() {//分區再均衡之前@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {//1、將偏移量提交到Kafka//2、偏移量寫入數據庫}//分區再均衡完成以后@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//1、從數據庫中獲取偏移量//2、通過seek()方法從指定偏移量位置開始讀取} });

    從特定偏移量處開始記錄

    通常情況下消費者沒有通過seek()方法指定讀取位置時,調用poll()方法默認都會從分區的最新偏移量處開始讀取消息。當然如果想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可以使 seekToBeginning(Collection tp)和 seekToEnd( Collectiontp)這兩個方法。而調用seek()是可以從從特定的偏移量處開始讀取消息的。

    //從指定分區中的指定偏移量開始消費 consumer.seek(topicPartition,2);

    優雅退出

    如果確定要退出循環,需要通過另一個線程調用 consumer. wakeup()方法。如果循環運行在主線程里,可以在 ShutdownHook 里調用該方法。要記住, consumer. wakeup()是消費者唯一一個可以從其他線程里安全調用的方法。

    反序列化器

    創建KafkaConsumer對象時需要指定反序列化器,將從Kafka接收到的字節數組轉換成 java對象,發送消息指定的序列化器必須與接收消息使用的反序列化器一一對應的。一些業務場景可能需要自定義反序列化器,那么只需要實現org.apache.kafka.common.serialization.Deserializer接口,重寫deserialize()方法定義反序列化邏輯即可。

    獨立消費者

    一個消費者從一個主題的所有分區或者某個特定的分區讀取數據,不需要消費者群組和再均衡,只需要把主題或者分區分配給消費者,然后開始讀取消息并提交偏移量。

    Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //獨立消費者(不需要訂閱主題,只需要分配主題中分區即可) KafkaConsumer<String,String> consumer= new KafkaConsumer<String, String>(properties); //拿到主題的分區信息 List<PartitionInfo> partitionInfos = consumer.partitionsFor("independ-consumer"); List<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>(); if(null!=partitionInfos){for(PartitionInfo partitionInfo:partitionInfos){topicPartitionList.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));} } //獨立消費者需要執行哪些分區(這里全部的分區分配給一個消費者) consumer.assign(topicPartitionList); try {while(true){ConsumerRecords<String, String> records= consumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String, String> record:records){System.out.println(String.format("主題:%s,分區:%d,偏移量:%d,key:%s,value:%s",record.topic(),record.partition(),record.offset(),record.key(),record.value()));}} } finally {consumer.close(); }

    總結

    以上是生活随笔為你收集整理的深入分析Kafka生产者和消费者的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。