日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Producer拦截器

發布時間:2024/4/11 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Producer拦截器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-producer-interceptor/

Kafka中的攔截器(Interceptor)是0.10.x.x版本引入的一個功能,一共有兩種:Kafka Producer端的攔截器和Kafka Consumer端的攔截器。本篇主要講述的是Kafka Producer端的攔截器,它主要用來對消息進行攔截或者修改,也可以用于Producer的Callback回調之前進行相應的預處理。

使用Kafka Producer端的攔截器非常簡單,主要是實現ProducerInterceptor接口,此接口包含4個方法:

  • ProducerRecord<K, V> onSend(ProducerRecord<K, V> record):Producer在將消息序列化和分配分區之前會調用攔截器的這個方法來對消息進行相應的操作。一般來說最好不要修改消息ProducerRecord的topic、key以及partition等信息,如果要修改,也需確保對其有準確的判斷,否則會與預想的效果出現偏差。比如修改key不僅會影響分區的計算,同樣也會影響Broker端日志壓縮(Log Compaction)的功能。
  • void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被應答(Acknowledgement)之前或者消息發送失敗時調用,優先于用戶設定的Callback之前執行。這個方法運行在Producer的IO線程中,所以這個方法里實現的代碼邏輯越簡單越好,否則會影響消息的發送速率。
  • void close():關閉當前的攔截器,此方法主要用于執行一些資源的清理工作。
  • configure(Map<String, ?> configs):用來初始化此類的方法,這個是ProducerInterceptor接口的父接口Configurable中的方法。
  • 一般情況下只需要關注并實現onSend或者onAcknowledgement方法即可。下面我們來舉個案例,通過onSend方法來過濾消息體為空的消息以及通過onAcknowledgement方法來計算發送消息的成功率。

    public class ProducerInterceptorDemo implements ProducerInterceptor<String,String> {private volatile long sendSuccess = 0;private volatile long sendFailure = 0;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {if(record.value().length()<=0)return null;return record;}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {if (exception == null) {sendSuccess++;} else {sendFailure ++;}}@Overridepublic void close() {double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);System.out.println("[INFO] 發送成功率="+String.format("%f", successRatio * 100)+"%");}@Overridepublic void configure(Map<String, ?> configs) {} }

    自定義的ProducerInterceptorDemo類實現之后就可以在Kafka Producer的主程序中指定,示例代碼如下:

    public class ProducerMain {public static final String brokerList = "localhost:9092";public static final String topic = "hidden-topic";public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", brokerList);properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo");Producer<String, String> producer = new KafkaProducer<String, String>(properties);for(int i=0;i<100;i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, "msg-" + i);producer.send(producerRecord).get();}producer.close();} }

    Kafka Producer不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,這個攔截鏈會按照其中的攔截器的加入順序一一執行。比如上面的程序多添加一個攔截器,示例如下:

    properties.put("interceptor.classes", "com.hidden.producer.ProducerInterceptorDemo,com.hidden.producer.ProducerInterceptorDemoPlus");

    這樣Kafka Producer會先執行攔截器ProducerInterceptorDemo,之后再執行ProducerInterceptorDemoPlus。

    有關interceptor.classes參數,在kafka 1.0.0版本中的定義如下:

    NAMEDESCRIPTIONTYPEDEFAULTVALID VALUESIMPORTANCE
    interceptor.calsssesA list of classes to use as interceptors. Implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. By default, there no interceptors.listnulllow

    歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-producer-interceptor/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Kafka Producer拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。