日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java作为kafka生产者实验及Expiring超时问题解决

發布時間:2023/12/3 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java作为kafka生产者实验及Expiring超时问题解决 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

【README】 java作為生產者,centos 作為消費者;

【1】生產者代碼?

-- pom.xml <!-- 依賴 --> <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.11.0.0</version></dependency></dependencies>

生產者

-- 生產者 public class MyProducer {public static void main(String[] args) {/* 1.創建kafka生產者的配置信息 */Properties props = new Properties();/*2.指定連接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092"); /*3.ack應答級別*/props.put(ProducerConfig.ACKS_CONFIG, "all");/*4.重試次數*/props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小,一次發送多少數據,當數據大于16k,生產者會發送數據到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待時間, 等待時間超過1毫秒,即便數據沒有大于16k, 也會寫數據到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 緩沖區大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化類 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.創建生產者對象 */KafkaProducer<String, String> producer = new KafkaProducer<>(props); /* 10.發送數據 */ for (int i = 0; i < 10; i++) { Future<RecordMetadata> future = producer.send(new ProducerRecord<>("first01", "first01-20201229--A" + i));System.out.printf("寫入數據%s \n", "first01-20201229--A" + i);try {System.out.println(future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.關閉資源 */ producer.close();System.out.println("kafka生產者寫入數據完成"); } }

【2】centos 消費者?

[root@centos202 kafka-0.11]# kafka-console-consumer.sh --topic first01 --bootstrap-server centos201:9092 --from-beginning first01-20201229--2 first01-20201229--6 first01-20201229--A0 first01-20201229--A1 first01-20201229--A2 first01-20201229--A3 first01-20201229--A4 first01-20201229--A5 first01-20201229--A6 first01-20201229--A7 first01-20201229--A8 first01-20201229--A9

【3】生產者發送消息超時問題

3.1、問題現場

kafka Expiring 1 record(s) for first01-3: 31539 ms has passed since batch creation plus linger time

3.2、解決方法

修改本地機器的hosts, 如下:

192.168.163.201 centos201 192.168.163.202 centos202 192.168.163.203 centos203

?

?

?

?

?

總結

以上是生活随笔為你收集整理的java作为kafka生产者实验及Expiring超时问题解决的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。