日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka2.0生产者客户端使用

發(fā)布時間:2023/12/13 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka2.0生产者客户端使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1 初始化配置

??Kafka 通過 KafkaProducer 構(gòu)造器初始化生產(chǎn)者客戶端的配置。
??常用的重要配置,詳見官網(wǎng)。

  • bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客戶端初始化時會自動發(fā)現(xiàn)地址,所以可以不填寫所有地址。
  • key.serializer:實現(xiàn)了 Kafka 序列化接口的類,用來序列化 key。
  • value.serializer:實現(xiàn)了 Kafka 序列化接口的類,用來序列化 value。
  • acks:leader 接收到的 follower 確認的數(shù)量需要滿足 acks 的配置。
    ?0:生產(chǎn)者把消息發(fā)送出去就認為發(fā)送完成了。
    ?1:leader 接收到消息后,不用等 follower 的確認,就表示發(fā)送完成了。
    ?all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 確認后,才表示完成了。
  • retries:消息發(fā)送失敗后的重試次數(shù)。如果允許重試,而 max.in.flight.requests.per.connection>1,則可能導(dǎo)致消息亂序,因為如果把兩批消息發(fā)送到同一個分區(qū),第一批失敗并重試,而第二批成功了,則第二批消息可能先生成了。
  • retry.backoff.ms:消息重試發(fā)送的間隔。
  • client.id:標識客戶端的 id。
  • compression.type:壓縮類型。可選:none、gzip、snappy、lz4。
  • buffer.memory:記錄累加器可以使用的最大內(nèi)存緩沖池大小。
  • batch.size:內(nèi)存緩沖池的緩沖列表大小。當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發(fā)送 batch。
  • transactional.id:事務(wù) ID。
// 基礎(chǔ)配置 Map<String, Object> configs = new HashMap<>(); // Kafka broker 集群 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); // key 序列化 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // value 序列化 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

2 構(gòu)造消息

??Kafka 提供了6種構(gòu)造器來構(gòu)造消息。

  • topic:消息主題,必填;
  • partition:分區(qū)號,非必填。如果為空,會計算 key 的 hash 值,再和該主題的分區(qū)總數(shù)取余得到分區(qū)號;如果 key 也為空,客戶端會生成遞增的隨機整數(shù),再和該主題的分區(qū)總數(shù)區(qū)域得到分區(qū)號。
  • timestamp:時間戳,非必填。如果為空,默認為 KafkaProducer 構(gòu)造器初始化的時間。
  • key:消息 key,非必填。關(guān)系到分區(qū)分配,broker 會對帶 key 的消息進行日志壓縮。
  • value:消息內(nèi)容,必填。
  • headers:消息頭,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value); public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers); public ProducerRecord(String topic, Integer partition, K key, V value); public ProducerRecord(String topic, K key, V value); public ProducerRecord(String topic, V value);

3 發(fā)送消息

??支持同步發(fā)送和異步發(fā)送消息。

??同步發(fā)送

producer.send(record).get();

??異步發(fā)送

producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回調(diào)處理流程} });

轉(zhuǎn)載于:https://www.cnblogs.com/bigshark/p/11182403.html

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的Kafka2.0生产者客户端使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。