日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

發布時間:2023/12/3 java 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本次的記錄內容包括:

1.Java調用生產者APi流程

2.Kafka生產者Api的使用及說明

3.Kafka消費者Api的使用及說明

4.Kafka消費者自動提交Offset和手動提交Offset

5.自定義生產者的攔截器,分區器

那么接下來我就帶大家熟悉以上Kafka的知識說明

1.Java調用生產者APi流程

首先上一張從網上找的簡單的圖,來描述一下生產者的生產流程。這里這個的圖描述的不是非常精確,稍微有點問題的地方就是省略了攔截器內容,這塊的內容在實際場景中也經常使用

那么從圖中我們可以看到。生產者通過調用api的Send方法開始進行一些列生產控制操作,首先進入的是一個叫序列化器的處理結構(這里就先按圖來講了--實際第一步會先經過攔截器),那么這一步主要的操作就是序列化相關數據,保證數據傳輸的穩定準確性,個人理解需要序列化的原因是因為kafka是磁盤文件寫消息,序列化后悔經過分區器,主要就是我們上篇講過的關于如何生產消息分區的策略,主要有三種,1.指定分區,2根據key的hash取余有效分區數分區,3初始化整數,輪訓分區。具體細節請參考上一篇文章(https://www.cnblogs.com/hnusthuyanhua/p/12355216.html)。經過分區后消息將會發送到指定的分區供消費者消費。

那么從圖中我們還可以看到有一個RecordMetaData的存在,這又是干什么的呢?這里就又設計到另一個知識點了。由于在網上未找打相關描述圖,我這里就粗略說明一下

大致的kafka生產者程序一般是有兩類線程進行,一個是主線程,另一個是生產消息的線程,他們質檢有一個RecoderMetaData作為消息存儲緩存,同時也是線程共享變量,當主線程不斷生產消息,本質上就是不斷累積RecoderMetaData的緩存值,當緩存值達到限定時,生產者線程開始講數據發送至kafka.。那么kafka生產者的一個流程大概就是這樣了

2.Kafka生產者Api的使用及說明

大致流程:配置kafka property信息---構建生產者---構建消息---發送消息---關閉資源

@Slf4j

public class KafkaProduce {

public static void main(String[] args) {

Properties properties = new Properties();

//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件

//設備地址

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");

//ack

properties.put(ProducerConfig.ACKS_CONFIG, "all");

//序列化器

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

//構建生產者

Producer producer = new KafkaProducer(properties);

for(int i = 0;i< 100;i++)

{

String msg = "------Message " + i;

//構建生產記錄

//第一種方式指定Toppic

ProducerRecord producerRecord=new ProducerRecord("kafkatest",msg);

//send方法分為有返回值和無返回值兩種

// 無返回值簡單發送消息

//producer.send(producerRecord);

//有返回值的在發送消息確認后返回一個Callback

producer.send(producerRecord, new Callback() {

@Override

public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if (e==null){

//發送數據返回兩個東西--一個是返回結果 一個是異常 異常為空時即發送操作正常

if (e==null){

//返回結果中可獲取此條消息的相關分區信息

System.out.println(recordMetadata.offset()+recordMetadata.partition()+recordMetadata.topic());

}

}

}

});

log.info("kafka生產者發送消息{}",msg);

}

producer.close();

}

}

kafka分區策略方法說明:

3.Kafka消費者Api的使用及說明

大致流程:配置kafka property信息---構建消費者---訂閱主題--消費消息

@Slf4j

public class KafkaConsumerTest {

public static void main(String[] args) {

Properties properties = new Properties();

//第一步 初始化化kafka服務配置Properties--具體配置可以抽到實際的Property配置文件

//設備地址

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");

//反序列化器

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");

//offset自動提交

//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

//重置offset---當團體名發生改變時且消費者保存的初始offset未過期時,消費者會從頭消費

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

//初始化消費者

Consumer consumer=new KafkaConsumer(properties);

//初始化消費者訂閱主題

consumer.subscribe(Arrays.asList("kafkatest"));

while (true) {

ConsumerRecords records = consumer.poll(1000);

for (ConsumerRecord record : records) {

//消費完按自動提交時間自動提交消費Offset

log.info("kafka消費者消費分區:{}-消息內容:{}",record.partition(),record.value());

}

//異步提交-即消費某條數據時發送offset更新,但消費繼續運行 不等待提交完成 效率較高 但當消費者異常掛掉時容

//易造成消費重復

consumer.commitAsync(new OffsetCommitCallback() {

@Override

public void onComplete(Map map, Exception e) {

//如果失敗E不為null 失敗的話E為Null

//對于需要絕對保證消息不丟失的 可在此處重新進行消費提交

}

});

//同步提交-即消費一條數據提交一次offset更新,消費必須等待offset更新完才可繼續運行。通常來講此方法可盡可能

//的減少數據丟失 但效率較低

//consumer.commitSync();

}

}

}

4.Kafka消費者自動提交Offset和手動提交Offset

自動提交:即消費者消費后自己提交消費offset標記去kafka更新信息,那么通常是通過時間來控制的,比如每10秒更新一次本地的offset到kafka,? 缺點:實際應用場景中難以控制時間,太短容易造成數據丟失(offset已經更新 消費者還沒消費完就掛了),太長容易導致數據重復(offset還未更新,消費者掛了重新從kafka拉取之前的offset).

手動提交:消費完成后自行提交offset,根據同步情況分為兩種方式,syn提交(提交時相當于阻塞主線程,等offset提交完成后方可繼續進行)和asyn提交(異步提交),大致流程:配置kafka property配置文件,將配置文件中的自動提交關閉。--構建消費者訂閱主題并消費--消費完成后手動提交offset.? ?缺點:同樣還是會有上面自動提交的數據重復問題。但減少了數據丟失的可能性。

5.自定義生產者的攔截器,分區器

@Slf4j

public class KafkaFilter implements ProducerInterceptor {

public static int i=0;

@Override

public ProducerRecord onSend(ProducerRecord producerRecord) {

/**

* 發送消息的方法 可對消息進行處理 比如加時間戳啥的

*/

log.info("{}:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.value());

return producerRecord;

}

@Override

public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

/**

* ack標記回調方法,有點類型Callback回調的方法

* 可在這統計一下成功發送的條數和失敗發送的條數

*/

}

@Override

public void close() {

}

@Override

public void configure(Map map) {

}

}

public class KafkaPartion implements Partitioner {

@Override

public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

/**

* 自定義分區 可通過該接口的默認分區器進行參考 默認為根據訂閱的主題來分區方式

*/

return 0;

}

@Override

public void close() {

}

@Override

public void configure(Map map) {

}

}

6.消費者如何消費歷史數據

大致流程:配置kafka property信息,開啟AutoOffset配置---構建消費者---訂閱主題--消費消息

那么每次開啟消費者如果想從頭開始消費,需要滿足以下條件之一:1.消費者的組名改變 2.消費者的初始offset未過期

相關參考文章:

kafka消費者監聽方式

總結

以上是生活随笔為你收集整理的java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。