日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java kafka 分区_Java kafka如何实现自定义分区类和拦截器

發布時間:2025/3/12 java 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kafka 分区_Java kafka如何实现自定义分区类和拦截器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

生產者發送到對應的分區有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對應的java api, 有多種參數)

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

但是kafka提供了,自定義分區算法的功能,由業務手動實現分布:

1、實現一個自定義分區類,custompartitioner實現partitioner

import org.apache.kafka.clients.producer.partitioner;

import org.apache.kafka.common.cluster;

import java.util.map;

public class custompartitioner implements partitioner {

/**

*

* @param topic 當前的發送的topic

* @param key 當前的key值

* @param keybytes 當前的key的字節數組

* @param value 當前的value值

* @param valuebytes 當前的value的字節數組

* @param cluster

* @return

*/

@override

public int partition(string topic, object key, byte[] keybytes, object value, byte[] valuebytes, cluster cluster) {

//這邊根據返回值就是分區號, 這邊就是固定發送到三號分區

return 3;

}

@override

public void close() {

}

@override

public void configure(map configs) {

}

}

2、producer配置文件指定,具體的分區類

// 具體的分區類

props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");

技巧:可以使用producerconfig中提供的配置producerconfig

kafka producer攔截器

攔截器(interceptor)是在kafka 0.10版本被引入的。

interceptor使得用戶在消息發送前以及producer回調邏輯前有機會對消息做一些定制化需求,比如修改消息等。

許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.producerinterceptor

我們可以編碼測試下:

1、定義消息攔截器,實現消息處理(可以是加時間戳等等,unid等等。)

import org.apache.kafka.clients.producer.producerinterceptor;

import org.apache.kafka.clients.producer.producerrecord;

import org.apache.kafka.clients.producer.recordmetadata;

import java.util.map;

import java.util.uuid;

public class messageinterceptor implements producerinterceptor {

@override

public void configure(map configs) {

system.out.println("這是messageinterceptor的configure方法");

}

/**

* 這個是消息發送之前進行處理

*

* @param record

* @return

*/

@override

public producerrecord onsend(producerrecord record) {

// 創建一個新的record,把uuid入消息體的最前部

system.out.println("為消息添加uuid");

return new producerrecord(record.topic(), record.partition(), record.timestamp(), record.key(),

uuid.randomuuid().tostring().replace("-", "") + "," + record.value());

}

/**

* 這個是生產者回調函數調用之前處理

* @param metadata

* @param exception

*/

@override

public void onacknowledgement(recordmetadata metadata, exception exception) {

system.out.println("messageinterceptor攔截器的onacknowledgement方法");

}

@override

public void close() {

system.out.println("messageinterceptor close 方法");

}

}

2、定義計數攔截器

import java.util.map;

import org.apache.kafka.clients.producer.producerinterceptor;

import org.apache.kafka.clients.producer.producerrecord;

import org.apache.kafka.clients.producer.recordmetadata;

public class counterinterceptor implements producerinterceptor{

private int errorcounter = 0;

private int successcounter = 0;

@override

public void configure(map configs) {

system.out.println("這是counterinterceptor的configure方法");

}

@override

public producerrecord onsend(producerrecord record) {

system.out.println("counterinterceptor計數過濾器不對消息做任何操作");

return record;

}

@override

public void onacknowledgement(recordmetadata metadata, exception exception) {

// 統計成功和失敗的次數

system.out.println("counterinterceptor過濾器執行統計失敗和成功數量");

if (exception == null) {

successcounter++;

} else {

errorcounter++;

}

}

@override

public void close() {

// 保存結果

system.out.println("successful sent: " + successcounter);

system.out.println("failed sent: " + errorcounter);

}

}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;

import java.util.arraylist;

import java.util.list;

import java.util.properties;

public class producer1 {

public static void main(string[] args) throws exception {

properties props = new properties();

// kafka服務端的主機名和端口號

props.put("bootstrap.servers", "localhost:9092");

// 等待所有副本節點的應答

props.put("acks", "all");

// 消息發送最大嘗試次數

props.put("retries", 0);

// 一批消息處理大小

props.put("batch.size", 16384);

// 請求延時,可能生產數據太快了

props.put("linger.ms", 1);

// 發送緩存區內存大小,數據是先放到生產者的緩沖區

props.put("buffer.memory", 33554432);

// key序列化

props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");

// value序列化

props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

// 具體的分區類

props.put(producerconfig.partitioner_class_config, "kafka.custompartitioner");

//定義攔截器

list interceptors = new arraylist<>();

interceptors.add("kafka.messageinterceptor");

interceptors.add("kafka.counterinterceptor");

props.put(producerconfig.interceptor_classes_config, interceptors);

producer producer = new kafkaproducer<>(props);

for (int i = 0; i < 1; i++) {

producer.send(new producerrecord("test_0515", i + "", "xxx-" + i), new callback() {

public void oncompletion(recordmetadata recordmetadata, exception e) {

system.out.println("這是producer回調函數");

}

});

}

/*system.out.println("現在執行關閉producer");

producer.close();*/

producer.close();

}

}

總結,我們可以知道攔截器鏈各個方法的執行順序,假如有a、b攔截器,在一個攔截器鏈中:

(1)執行a的configure方法,執行b的configure方法

(2)執行a的onsend方法,b的onsend方法

(3)生產者發送完畢后,執行a的onacknowledgement方法,b的onacknowledgement方法。

(4)執行producer自身的callback回調函數。

(5)執行a的close方法,b的close方法。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持萬仟網。

如您對本文有疑問或者有任何想說的,請點擊進行留言回復,萬千網友為您解惑!

總結

以上是生活随笔為你收集整理的java kafka 分区_Java kafka如何实现自定义分区类和拦截器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。