java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明
本次的記錄內(nèi)容包括:
1.Java調(diào)用生產(chǎn)者APi流程
2.Kafka生產(chǎn)者Api的使用及說(shuō)明
3.Kafka消費(fèi)者Api的使用及說(shuō)明
4.Kafka消費(fèi)者自動(dòng)提交Offset和手動(dòng)提交Offset
5.自定義生產(chǎn)者的攔截器,分區(qū)器
那么接下來(lái)我就帶大家熟悉以上Kafka的知識(shí)說(shuō)明
1.Java調(diào)用生產(chǎn)者APi流程
首先上一張從網(wǎng)上找的簡(jiǎn)單的圖,來(lái)描述一下生產(chǎn)者的生產(chǎn)流程。這里這個(gè)的圖描述的不是非常精確,稍微有點(diǎn)問(wèn)題的地方就是省略了攔截器內(nèi)容,這塊的內(nèi)容在實(shí)際場(chǎng)景中也經(jīng)常使用
那么從圖中我們可以看到。生產(chǎn)者通過(guò)調(diào)用api的Send方法開(kāi)始進(jìn)行一些列生產(chǎn)控制操作,首先進(jìn)入的是一個(gè)叫序列化器的處理結(jié)構(gòu)(這里就先按圖來(lái)講了--實(shí)際第一步會(huì)先經(jīng)過(guò)攔截器),那么這一步主要的操作就是序列化相關(guān)數(shù)據(jù),保證數(shù)據(jù)傳輸?shù)姆€(wěn)定準(zhǔn)確性,個(gè)人理解需要序列化的原因是因?yàn)閗afka是磁盤(pán)文件寫(xiě)消息,序列化后悔經(jīng)過(guò)分區(qū)器,主要就是我們上篇講過(guò)的關(guān)于如何生產(chǎn)消息分區(qū)的策略,主要有三種,1.指定分區(qū),2根據(jù)key的hash取余有效分區(qū)數(shù)分區(qū),3初始化整數(shù),輪訓(xùn)分區(qū)。具體細(xì)節(jié)請(qǐng)參考上一篇文章(https://www.cnblogs.com/hnusthuyanhua/p/12355216.html)。經(jīng)過(guò)分區(qū)后消息將會(huì)發(fā)送到指定的分區(qū)供消費(fèi)者消費(fèi)。
那么從圖中我們還可以看到有一個(gè)RecordMetaData的存在,這又是干什么的呢?這里就又設(shè)計(jì)到另一個(gè)知識(shí)點(diǎn)了。由于在網(wǎng)上未找打相關(guān)描述圖,我這里就粗略說(shuō)明一下
大致的kafka生產(chǎn)者程序一般是有兩類(lèi)線程進(jìn)行,一個(gè)是主線程,另一個(gè)是生產(chǎn)消息的線程,他們質(zhì)檢有一個(gè)RecoderMetaData作為消息存儲(chǔ)緩存,同時(shí)也是線程共享變量,當(dāng)主線程不斷生產(chǎn)消息,本質(zhì)上就是不斷累積RecoderMetaData的緩存值,當(dāng)緩存值達(dá)到限定時(shí),生產(chǎn)者線程開(kāi)始講數(shù)據(jù)發(fā)送至kafka.。那么kafka生產(chǎn)者的一個(gè)流程大概就是這樣了
2.Kafka生產(chǎn)者Api的使用及說(shuō)明
大致流程:配置kafka property信息---構(gòu)建生產(chǎn)者---構(gòu)建消息---發(fā)送消息---關(guān)閉資源
@Slf4j
public class KafkaProduce {
public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務(wù)配置Properties--具體配置可以抽到實(shí)際的Property配置文件
//設(shè)備地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//ack
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//構(gòu)建生產(chǎn)者
Producer producer = new KafkaProducer(properties);
for(int i = 0;i< 100;i++)
{
String msg = "------Message " + i;
//構(gòu)建生產(chǎn)記錄
//第一種方式指定Toppic
ProducerRecord producerRecord=new ProducerRecord("kafkatest",msg);
//send方法分為有返回值和無(wú)返回值兩種
// 無(wú)返回值簡(jiǎn)單發(fā)送消息
//producer.send(producerRecord);
//有返回值的在發(fā)送消息確認(rèn)后返回一個(gè)Callback
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e==null){
//發(fā)送數(shù)據(jù)返回兩個(gè)東西--一個(gè)是返回結(jié)果 一個(gè)是異常 異常為空時(shí)即發(fā)送操作正常
if (e==null){
//返回結(jié)果中可獲取此條消息的相關(guān)分區(qū)信息
System.out.println(recordMetadata.offset()+recordMetadata.partition()+recordMetadata.topic());
}
}
}
});
log.info("kafka生產(chǎn)者發(fā)送消息{}",msg);
}
producer.close();
}
}
kafka分區(qū)策略方法說(shuō)明:
3.Kafka消費(fèi)者Api的使用及說(shuō)明
大致流程:配置kafka property信息---構(gòu)建消費(fèi)者---訂閱主題--消費(fèi)消息
@Slf4j
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
//第一步 初始化化kafka服務(wù)配置Properties--具體配置可以抽到實(shí)際的Property配置文件
//設(shè)備地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.24.1.77:9092");
//反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
//offset自動(dòng)提交
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//重置offset---當(dāng)團(tuán)體名發(fā)生改變時(shí)且消費(fèi)者保存的初始o(jì)ffset未過(guò)期時(shí),消費(fèi)者會(huì)從頭消費(fèi)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
//初始化消費(fèi)者
Consumer consumer=new KafkaConsumer(properties);
//初始化消費(fèi)者訂閱主題
consumer.subscribe(Arrays.asList("kafkatest"));
while (true) {
ConsumerRecords records = consumer.poll(1000);
for (ConsumerRecord record : records) {
//消費(fèi)完按自動(dòng)提交時(shí)間自動(dòng)提交消費(fèi)Offset
log.info("kafka消費(fèi)者消費(fèi)分區(qū):{}-消息內(nèi)容:{}",record.partition(),record.value());
}
//異步提交-即消費(fèi)某條數(shù)據(jù)時(shí)發(fā)送offset更新,但消費(fèi)繼續(xù)運(yùn)行 不等待提交完成 效率較高 但當(dāng)消費(fèi)者異常掛掉時(shí)容
//易造成消費(fèi)重復(fù)
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map map, Exception e) {
//如果失敗E不為null 失敗的話E為Null
//對(duì)于需要絕對(duì)保證消息不丟失的 可在此處重新進(jìn)行消費(fèi)提交
}
});
//同步提交-即消費(fèi)一條數(shù)據(jù)提交一次offset更新,消費(fèi)必須等待offset更新完才可繼續(xù)運(yùn)行。通常來(lái)講此方法可盡可能
//的減少數(shù)據(jù)丟失 但效率較低
//consumer.commitSync();
}
}
}
4.Kafka消費(fèi)者自動(dòng)提交Offset和手動(dòng)提交Offset
自動(dòng)提交:即消費(fèi)者消費(fèi)后自己提交消費(fèi)offset標(biāo)記去kafka更新信息,那么通常是通過(guò)時(shí)間來(lái)控制的,比如每10秒更新一次本地的offset到kafka,? 缺點(diǎn):實(shí)際應(yīng)用場(chǎng)景中難以控制時(shí)間,太短容易造成數(shù)據(jù)丟失(offset已經(jīng)更新 消費(fèi)者還沒(méi)消費(fèi)完就掛了),太長(zhǎng)容易導(dǎo)致數(shù)據(jù)重復(fù)(offset還未更新,消費(fèi)者掛了重新從kafka拉取之前的offset).
手動(dòng)提交:消費(fèi)完成后自行提交offset,根據(jù)同步情況分為兩種方式,syn提交(提交時(shí)相當(dāng)于阻塞主線程,等offset提交完成后方可繼續(xù)進(jìn)行)和asyn提交(異步提交),大致流程:配置kafka property配置文件,將配置文件中的自動(dòng)提交關(guān)閉。--構(gòu)建消費(fèi)者訂閱主題并消費(fèi)--消費(fèi)完成后手動(dòng)提交offset.? ?缺點(diǎn):同樣還是會(huì)有上面自動(dòng)提交的數(shù)據(jù)重復(fù)問(wèn)題。但減少了數(shù)據(jù)丟失的可能性。
5.自定義生產(chǎn)者的攔截器,分區(qū)器
@Slf4j
public class KafkaFilter implements ProducerInterceptor {
public static int i=0;
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
/**
* 發(fā)送消息的方法 可對(duì)消息進(jìn)行處理 比如加時(shí)間戳啥的
*/
log.info("{}:{}",producerRecord.topic(),producerRecord.partition(),producerRecord.value());
return producerRecord;
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
/**
* ack標(biāo)記回調(diào)方法,有點(diǎn)類(lèi)型Callback回調(diào)的方法
* 可在這統(tǒng)計(jì)一下成功發(fā)送的條數(shù)和失敗發(fā)送的條數(shù)
*/
}
@Override
public void close() {
}
@Override
public void configure(Map map) {
}
}
public class KafkaPartion implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
/**
* 自定義分區(qū) 可通過(guò)該接口的默認(rèn)分區(qū)器進(jìn)行參考 默認(rèn)為根據(jù)訂閱的主題來(lái)分區(qū)方式
*/
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map map) {
}
}
6.消費(fèi)者如何消費(fèi)歷史數(shù)據(jù)
大致流程:配置kafka property信息,開(kāi)啟AutoOffset配置---構(gòu)建消費(fèi)者---訂閱主題--消費(fèi)消息
那么每次開(kāi)啟消費(fèi)者如果想從頭開(kāi)始消費(fèi),需要滿足以下條件之一:1.消費(fèi)者的組名改變 2.消費(fèi)者的初始o(jì)ffset未過(guò)期
相關(guān)參考文章:
kafka消費(fèi)者監(jiān)聽(tīng)方式
總結(jié)
以上是生活随笔為你收集整理的java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 如何将手机照片传到u盘
- 下一篇: string index out of