Kafka2.0生产者客户端使用
生活随笔
收集整理的這篇文章主要介紹了
Kafka2.0生产者客户端使用
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1 初始化配置
??Kafka 通過 KafkaProducer 構(gòu)造器初始化生產(chǎn)者客戶端的配置。
??常用的重要配置,詳見官網(wǎng)。
- bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客戶端初始化時會自動發(fā)現(xiàn)地址,所以可以不填寫所有地址。
- key.serializer:實現(xiàn)了 Kafka 序列化接口的類,用來序列化 key。
- value.serializer:實現(xiàn)了 Kafka 序列化接口的類,用來序列化 value。
- acks:leader 接收到的 follower 確認的數(shù)量需要滿足 acks 的配置。
?0:生產(chǎn)者把消息發(fā)送出去就認為發(fā)送完成了。
?1:leader 接收到消息后,不用等 follower 的確認,就表示發(fā)送完成了。
?all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 確認后,才表示完成了。 - retries:消息發(fā)送失敗后的重試次數(shù)。如果允許重試,而 max.in.flight.requests.per.connection>1,則可能導(dǎo)致消息亂序,因為如果把兩批消息發(fā)送到同一個分區(qū),第一批失敗并重試,而第二批成功了,則第二批消息可能先生成了。
- retry.backoff.ms:消息重試發(fā)送的間隔。
- client.id:標識客戶端的 id。
- compression.type:壓縮類型。可選:none、gzip、snappy、lz4。
- buffer.memory:記錄累加器可以使用的最大內(nèi)存緩沖池大小。
- batch.size:內(nèi)存緩沖池的緩沖列表大小。當 batch 的大小超過 batch.size 或者時間達到 linger.ms 就會發(fā)送 batch。
- transactional.id:事務(wù) ID。
2 構(gòu)造消息
??Kafka 提供了6種構(gòu)造器來構(gòu)造消息。
- topic:消息主題,必填;
- partition:分區(qū)號,非必填。如果為空,會計算 key 的 hash 值,再和該主題的分區(qū)總數(shù)取余得到分區(qū)號;如果 key 也為空,客戶端會生成遞增的隨機整數(shù),再和該主題的分區(qū)總數(shù)區(qū)域得到分區(qū)號。
- timestamp:時間戳,非必填。如果為空,默認為 KafkaProducer 構(gòu)造器初始化的時間。
- key:消息 key,非必填。關(guān)系到分區(qū)分配,broker 會對帶 key 的消息進行日志壓縮。
- value:消息內(nèi)容,必填。
- headers:消息頭,非必填。
3 發(fā)送消息
??支持同步發(fā)送和異步發(fā)送消息。
??同步發(fā)送
producer.send(record).get();??異步發(fā)送
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {// 回調(diào)處理流程} });轉(zhuǎn)載于:https://www.cnblogs.com/bigshark/p/11182403.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的Kafka2.0生产者客户端使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenCV 学习笔记(10)HSV颜色
- 下一篇: [论文笔记]CVPR2017_Joint