日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器

發布時間:2025/3/21 java 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Java kafka如何實現自定義分區類和攔截器

2、producer配置文件指定,具體的分區類

// 具體的分區類

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor {

@Override

public void configure(Map configs) {

System.out.println("這是MessageInterceptor的configure方法");

}

/**

* 這個是消息發送之前進行處理

*

* @param record

* @return

*/

@Override

public ProducerRecord onSend(ProducerRecord record) {

// 創建一個新的record,把uuid入消息體的最前部

System.out.println("為消息添加uuid");

return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),

UUID.randomUUID().toString().replace("-", "") + "," + record.value());

}

/**

* 這個是生產者回調函數調用之前處理

* @param metadata

* @param exception

*/

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

System.out.println("MessageInterceptor攔截器的onAcknowledgement方法");

}

@Override

public void close() {

System.out.println("MessageInterceptor close 方法");

}

}

2、定義計數攔截器

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor{

private int errorCounter = 0;

private int successCounter = 0;

@Override

public void configure(Map configs) {

System.out.println("這是CounterInterceptor的configure方法");

}

@Override

public ProducerRecord onSend(ProducerRecord record) {

System.out.println("CounterInterceptor計數過濾器不對消息做任何操作");

return record;

}

@Override

public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

// 統計成功和失敗的次數

System.out.println("CounterInterceptor過濾器執行統計失敗和成功數量");

if (exception == null) {

successCounter++;

} else {

errorCounter++;

}

}

@Override

public void close() {

// 保存結果

System.out.println("Successful sent: " + successCounter);

System.out.println("Failed sent: " + errorCounter);

}

}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;

public class Producer1 {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

// Kafka服務端的主機名和端口號

props.put("bootstrap.servers", "localhost:9092");

// 等待所有副本節點的應答

props.put("acks", "all");

// 消息發送最大嘗試次數

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 請求延時,可能生產數據太快了

props.put("linger.ms", 1);

// 發送緩存區內存大小,數據是先放到生產者的緩沖區

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 具體的分區類

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

//定義攔截器

List interceptors = new ArrayList<>();

interceptors.add("kafka.MessageInterceptor");

interceptors.add("kafka.CounterInterceptor");

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

Producer producer = new KafkaProducer<>(props);

for (int i = 0; i < 1; i++) {

producer.send(new ProducerRecord("test_0515", i + "", "xxx-" + i), new Callback() {

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

System.out.println("這是producer回調函數");

}

});

}

/*System.out.println("現在執行關閉producer");

producer.close();*/

producer.close();

}

}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有A、B攔截器,在一個攔截器鏈中:

(1)執行A的configure方法,執行B的configure方法

(2)執行A的onSend方法,B的onSend方法

(3)生產者發送完畢后,執行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行A的close方法,B的close方法。

Java kafka如何實現自定義分區類和攔截器相關教程

總結

以上是生活随笔為你收集整理的java kafka 设置分区_Java kafka如何实现自定义分区类和拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。