日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java客户端作为kafka生产者测试

發(fā)布時間:2023/12/3 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java客户端作为kafka生产者测试 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

【README】

1、本文主要對 java客戶端作為kafka 生產(chǎn)者進行測試, 消費者由 centos的kafka命令行線程扮演;?

2、消息發(fā)送:?kafka的生產(chǎn)者采用異步發(fā)送消息的方式,在消息發(fā)送過程中,涉及到2個線程——main線程和sender線程,以及一個線程共享變量 RecordAccumulator。main線程將消息發(fā)送給 RecordAccumulator,sender線程不斷從 RecordAccumulator 中讀取數(shù)據(jù)發(fā)送到 kafka broker;

step1)生產(chǎn)者中的main線程把數(shù)據(jù)經(jīng)過 攔截器-》序列化器-》分區(qū)器 處理;然后再把數(shù)據(jù)寫到 RecordAccumulator; step2)send 線程從 RecordAccumulator 中取出數(shù)據(jù)寫入到kafka集群;

3、開發(fā)環(huán)境

-- pom.xml<!-- 依賴 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version><scope>compile</scope></dependency></dependencies>-- log4j.properties log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

【0】 生產(chǎn)者同步發(fā)送消息

為啥需要同步發(fā)送? 因為 kafka可以保證單個分區(qū)內(nèi)消息有序,但無法保證全局有序,即多個分區(qū)消息有序;?

存在一些業(yè)務(wù)場景,需要消息有序;

/*** 同步消息生產(chǎn)者*/ public class SyncProducer {public static void main(String[] args) {/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack應(yīng)答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數(shù)*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次發(fā)送多少數(shù)據(jù),當(dāng)數(shù)據(jù)大于16k,生產(chǎn)者會發(fā)送數(shù)據(jù)到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待時間, 等待時間超過1毫秒,即便數(shù)據(jù)沒有大于16k, 也會寫數(shù)據(jù)到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 緩沖區(qū)大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化類 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 9.創(chuàng)建生產(chǎn)者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.同步發(fā)送數(shù)據(jù) */ for (int i = 0; i < 10; i++) { try {Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--D" + i));RecordMetadata rMetadata = future.get(); // 調(diào)用future的get方法,讓main線程阻塞,就可以實現(xiàn)同步發(fā)送 } catch (Exception e) {e.printStackTrace();} }/* 11.關(guān)閉資源 */ producer.close();System.out.println("kafka生產(chǎn)者寫入數(shù)據(jù)完成"); } }

?

下面都是異步發(fā)送

【1】普通生產(chǎn)者

1.1、生產(chǎn)者代碼?

/*** 普通生產(chǎn)者 */ public class MyProducer {public static void main(String[] args) {/* 1.創(chuàng)建kafka生產(chǎn)者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack應(yīng)答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數(shù)*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次發(fā)送多少數(shù)據(jù),當(dāng)數(shù)據(jù)大于16k,生產(chǎn)者會發(fā)送數(shù)據(jù)到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待時間, 等待時間超過1毫秒,即便數(shù)據(jù)沒有大于16k, 也會寫數(shù)據(jù)到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 緩沖區(qū)大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化類 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.創(chuàng)建生產(chǎn)者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.發(fā)送數(shù)據(jù) */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--D" + i));try {System.out.println(future.get().partition() + "-" + future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.關(guān)閉資源 */ producer.close();System.out.println("kafka生產(chǎn)者寫入數(shù)據(jù)完成"); } } -- 日志 0-183203 0-183204 0-183205 0-183206 0-183207 0-183208 0-183209 0-183210 0-183211 0-183212 kafka生產(chǎn)者寫入數(shù)據(jù)完成

1.2、消費者

[root@centos201 ~]# kafka-console-consumer.sh --topic first100 --bootstrap-server centos201:9092 first100-20210101--D0 first100-20210101--D1 first100-20210101--D2 first100-20210101--D3 first100-20210101--D4 first100-20210101--D5 first100-20210101--D6 first100-20210101--D7 first100-20210101--D8 first100-20210101--D9

【2】帶回調(diào)的生產(chǎn)者

2.1、生產(chǎn)者

/*** 帶回調(diào)的生產(chǎn)者 */for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first100", "first100-20210101--E" + i), (metadata, exception)-> {/* lambda 表達式 */System.out.println(metadata.partition() + " -- " + metadata.offset());});}

2.2、消費者

first100-20210101--E0 first100-20210101--E1 first100-20210101--E2 first100-20210101--E3 first100-20210101--E4 first100-20210101--E5 first100-20210101--E6 first100-20210101--E7 first100-20210101--E8 first100-20210101--E9

【3】創(chuàng)建分區(qū)策略的生產(chǎn)者 (指定分區(qū))

0、查看topic, 4個分區(qū),3個副本

[root@centos201 ~]# kafka-topics.sh --describe --topic aaa --zookeeper centos201:2181 Topic:aaa PartitionCount:4 ReplicationFactor:3 Configs:Topic: aaa Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3Topic: aaa Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 1,2,3Topic: aaa Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 2,1,3Topic: aaa Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3

雖然集群只有3臺機器, centos201, centos202, centos203 ;?

當(dāng)我的分區(qū)數(shù)是4,即分區(qū)數(shù)可以大于broker數(shù)量; 但副本數(shù)必須小于等于 broker數(shù)量;?

3.1、生產(chǎn)者

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); // 設(shè)置分區(qū)器 /* 9.創(chuàng)建生產(chǎn)者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.發(fā)送數(shù)據(jù) */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("aaa", "aaa-key", "aaa-20210101--B" + i), (metadata, exception)-> {/* lambda 表達式 */System.out.println(metadata.partition() + " -- " + metadata.offset());});}-- 日志 1 -- 112 1 -- 113 1 -- 114 1 -- 115 1 -- 116 1 -- 117 1 -- 118 1 -- 119 1 -- 120 1 -- 121 kafka生產(chǎn)者寫入數(shù)據(jù)完成

3.2、自定義分區(qū)器?

/*** 自定義分區(qū)器*/ public class MyPartitioner implements Partitioner {@Overridepublic void configure(Map<String, ?> configs) {}@Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {Integer integer = cluster.partitionCountForTopic(topic);return 1;}@Overridepublic void close() {} }

3.3、消費者

[root@centos201 ~]# kafka-console-consumer.sh --topic aaa --bootstrap-server centos201:9092 aaa-20210101--C0 aaa-20210101--C1 aaa-20210101--C2 aaa-20210101--C3 aaa-20210101--C4 aaa-20210101--C5 aaa-20210101--C6 aaa-20210101--C7 aaa-20210101--C8 aaa-20210101--C9

小結(jié): 可以查看,即便topic 有4個分區(qū),但我在自定義分區(qū)器中指定寫入到分區(qū)1, 所以生產(chǎn)者只把消息寫到分區(qū)1;?

?

?

?

?

總結(jié)

以上是生活随笔為你收集整理的java客户端作为kafka生产者测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。