Kafka2.0生产者客户端使用
生活随笔
收集整理的這篇文章主要介紹了
Kafka2.0生产者客户端使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1 初始化配置
??Kafka 通過 KafkaProducer 構造器初始化生產者客戶端的配置。
??常用的重要配置,詳見官網。
- bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客戶端初始化時會自動發現地址,所以可以不填寫所有地址。
- key.serializer:實現了 Kafka 序列化接口的類,用來序列化 key。
- value.serializer:實現了 Kafka 序列化接口的類,用來序列化 value。
- acks:leader 接收到的 follower 確認的數量需要滿足 acks 的配置。
?0:生產者把消息發送出去就認為發送完成了。
?1:leader 接收到消息后,不用等 follower 的確認,就表示發送完成了。
?all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 確認后,才表示完成了。 - retries:消息發送失敗后的重試次數。如果允許重試,而 max.in.flight.requests.per.connection>1,則可能導致消息亂序,因為如果把兩批消息發送到同一個分區,第一批失敗并重試,而第二批成功了,則第二批消息可能先生成了。
- retry.backoff.ms:消息重試發送的間隔。
- client.id:標識客戶端的 id。
- compression.type:壓縮類型。可選:none、gzip、snappy、lz4。
- buffer.memory:記錄累加器可以使用的最大內存緩沖池大小。
- batch.size:內存緩沖池的緩沖列表大小。當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發送 batch。
- transactional.id:事務 ID。
2 構造消息
??Kafka 提供了6種構造器來構造消息。
- topic:消息主題,必填;
- partition:分區號,非必填。如果為空,會計算 key 的 hash 值,再和該主題的分區總數取余得到分區號;如果 key 也為空,客戶端會生成遞增的隨機整數,再和該主題的分區總數區域得到分區號。
- timestamp:時間戳,非必填。如果為空,默認為 KafkaProducer 構造器初始化的時間。
- key:消息 key,非必填。關系到分區分配,broker 會對帶 key 的消息進行日志壓縮。
- value:消息內容,必填。
- headers:消息頭,非必填。
3 發送消息
??支持同步發送和異步發送消息。
??同步發送
producer.send(record).get();??異步發送
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回調處理流程} });轉載于:https://www.cnblogs.com/bigshark/p/11182403.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Kafka2.0生产者客户端使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenCV 学习笔记(10)HSV颜色
- 下一篇: [论文笔记]CVPR2017_Joint