java kafka 分区_Java kafka如何实现自定义分区类和拦截器
生產者發送到對應的分區有以下幾種方式:
(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)
(2)未指定patition但指定key,通過對key的value進行hash出一個patition;
(3)patition和key都未指定,使用輪詢選出一個patition。
但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:
1、實現一個自定義分區類,custompartitioner實現partitioner
import org.apache.kafka.clients.producer.partitioner;
import org.apache.kafka.common.cluster;
import java.util.map;
public class custompartitioner implements partitioner {
/**
*
* @param topic 當前的發送的topic
* @param key 當前的key值
* @param keybytes 當前的key的字節數組
* @param value 當前的value值
* @param valuebytes 當前的value的字節數組
* @param cluster
* @return
*/
@override
public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {
//這邊根據返回值就是分區號, 這邊就是固定發送到三號分區
return 3;
}
@override
public void close() {
}
@override
public void configure(map configs) {
}
}
2、producer配置文件指定,具體的分區類
// 具體的分區類
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
技巧:可以使用producerconfig中提供的配置producerconfig
kafka producer攔截器
攔截器(interceptor)是在kafka 0.10版本被引入的。
interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。
許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。
所使用的類為:
org.apache.kafka.clients.producer.producerinterceptor
我們可以編碼測試下:
1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)
import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
import java.util.map;
import java.util.uuid;
public class messageinterceptor implements producerinterceptor {
@override
public void configure(map configs) {
system.out.println("這是messageinterceptor的configure方法");
}
/**
* 這個是消息發送之前進行處理
*
* @param record
* @return
*/
@override
public producerrecord onsend(producerrecord record) {
// 創建一個新的record,把uuid入消息體的最前部
system.out.println("為消息添加uuid");
return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),
uuid.randomuuid().tostring().replace("-", "") + "," + record.value());
}
/**
* 這個是生產者回調函數調用之前處理
* @param metadata
* @param exception
*/
@override
public void onacknowledgement(recordmetadata metadata, exception exception) {
system.out.println("messageinterceptor攔截器的onacknowledgement方法");
}
@override
public void close() {
system.out.println("messageinterceptor close 方法");
}
}
2、定義計數攔截器
import java.util.map;
import org.apache.kafka.clients.producer.producerinterceptor;
import org.apache.kafka.clients.producer.producerrecord;
import org.apache.kafka.clients.producer.recordmetadata;
public class counterinterceptor implements producerinterceptor{
private int errorcounter = 0;
private int successcounter = 0;
@override
public void configure(map configs) {
system.out.println("這是counterinterceptor的configure方法");
}
@override
public producerrecord onsend(producerrecord record) {
system.out.println("counterinterceptor計數過濾器不對消息做任何操作");
return record;
}
@override
public void onacknowledgement(recordmetadata metadata, exception exception) {
// 統計成功和失敗的次數
system.out.println("counterinterceptor過濾器執行統計失敗和成功數量");
if (exception == null) {
successcounter++;
} else {
errorcounter++;
}
}
@override
public void close() {
// 保存結果
system.out.println("successful sent: " + successcounter);
system.out.println("failed sent: " + errorcounter);
}
}
3、producer客戶端:
import org.apache.kafka.clients.producer.*;
import java.util.arraylist;
import java.util.list;
import java.util.properties;
public class producer1 {
public static void main(string[] args) throws exception {
properties props = new properties();
// kafka服務端的主機名和端口號
props.put("bootstrap.servers", "localhost:9092");
// 等待所有副本節點的應答
props.put("acks", "all");
// 消息發送最大嘗試次數
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 請求延時,可能生產數據太快了
props.put("linger.ms", 1);
// 發送緩存區內存大小,數據是先放到生產者的緩沖區
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
// 具體的分區類
props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");
//定義攔截器
list interceptors = new arraylist<>();
interceptors.add("kafka.messageinterceptor");
interceptors.add("kafka.counterinterceptor");
props.put(producerconfig.interceptor_classes_config, interceptors);
producer producer = new kafkaproducer<>(props);
for (int i = 0; i < 1; i++) {
producer.send(new producerrecord("test_0515", i + "", "xxx-" + i), new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
system.out.println("這是producer回調函數");
}
});
}
/*system.out.println("現在執行關閉producer");
producer.close();*/
producer.close();
}
}
總結,我們可以知道攔截器鏈各個方法的執行順序,假如有a、b攔截器,在一個攔截器鏈中:
(1)執行a的configure方法,執行b的configure方法
(2)執行a的onsend方法,b的onsend方法
(3)生產者發送完畢后,執行a的onacknowledgement方法,b的onacknowledgement方法。
(4)執行producer自身的callback回調函數。
(5)執行a的close方法,b的close方法。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持萬仟網。
如您對本文有疑問或者有任何想說的,請點擊進行留言回復,萬千網友為您解惑!
總結
以上是生活随笔為你收集整理的java kafka 分区_Java kafka如何实现自定义分区类和拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: oracle右连接失效,oracle 右
- 下一篇: java美元兑换,(Java实现) 美元