kafka笔记3(生产者)
?
創(chuàng)建Kafka生產(chǎn)者:
Kafka生產(chǎn)者有3個(gè)必選屬性:
bootstrap.servers ? broker地址清單,格式為host:port ? ,清單中不必包含所有broker,但至少2個(gè)
key.serializer? = org.apache.kafka.common.serialization.Serializer 接口類,生產(chǎn)者使用這個(gè)類把鍵對(duì)象序列化為字節(jié)數(shù)組
Kafka還提供了ByteArraySerializer,StringSerializer,IntegerSerializer 實(shí)現(xiàn)類
value.serializer? 與key.serializer可以將值序列化
發(fā)送消息有3種方式:
發(fā)送并忘記(fire-and-forget)? 發(fā)送消息給服務(wù)器,但是不關(guān)心是否正常到達(dá)
同步發(fā)送? 使用send()發(fā)送信息,返回一個(gè)future對(duì)象,調(diào)用get()方法進(jìn)行等待,了解信息是否正常發(fā)送
異步發(fā)送 ?? 調(diào)用send()函數(shù),并指定一個(gè)回調(diào)函數(shù),服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)
?
生產(chǎn)者的可選屬性:
acks 指定有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫入是成功的
ack=0? 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)
ack=1 只要集群的首領(lǐng)節(jié)點(diǎn)收到信息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)
ack=all 所有參數(shù)與復(fù)制的節(jié)點(diǎn)全部收到消息,生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的響應(yīng),這種模式最安全
buffer.memory? 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)大小,生產(chǎn)者使用它緩沖要發(fā)送到服務(wù)器的消息
compression.type? 默認(rèn)消息發(fā)送不使用壓縮,該參數(shù)可以設(shè)置為snappy,gzip,lz4
retries 生產(chǎn)者可以重發(fā)消息的次數(shù),默認(rèn) 每次重試之間等待100ms,可以使用參數(shù) retry.backoff.ms參數(shù)改變這個(gè)時(shí)間間隔
batch.size 指定一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算
linger.ms 指定生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。Kafkaproducer 會(huì)在批次填滿或linger.ms達(dá)到上限時(shí)把批次發(fā)送出去
client.id 該參數(shù)可以指定任意的字符串,服務(wù)器會(huì)用它識(shí)別信息的來(lái)源,還可以用在日志和配額指標(biāo)里
max.in.flight.requests.per.connection? 指定生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息,值越高越占用內(nèi)存;設(shè)為1可以保證消息是按照發(fā)送順序?qū)懭敕?wù)器的
timeout.ms ? reuqest.timeout.ms ? metadata.fetch.timeout.ms?
request.timeout.ms 指定生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間
metadata.timeout.ms 指定生產(chǎn)者在獲取元數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)時(shí)間
tiemout.ms指定broker等待同步副本返回消息確認(rèn)的時(shí)間,與acks的配置相匹配
max.block.ms? 指定調(diào)用send()或partitionsFor()方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間,當(dāng)阻塞時(shí)間到達(dá)max.block.ms時(shí),生產(chǎn)者會(huì)拋出異常
max.request.size 控制生產(chǎn)者發(fā)送的請(qǐng)求大小,指能發(fā)送的單個(gè)消息的最大值或單個(gè)請(qǐng)求里所有消息的總和
broker對(duì)可接收的消息最大值也有自己的限制(message.max.bytes),兩邊配置最好匹配,避免生產(chǎn)者發(fā)送消息被拒絕
receive.buffer.bytes 和 send.buffer.bytes
這2個(gè)參數(shù)分別指定了TCP socket接收和發(fā)送數(shù)據(jù)包緩沖區(qū)大小,默認(rèn)-1
max.in.flight.requests.per.connection=1 保證了消息的順序,如果大于1,第一批次寫入失敗后,重試成功可能會(huì)改變消息的順序
?
序列化器:
自定義序列化器
Avro序列化
Kafka使用Avro序列器是通過(guò)schema注冊(cè)表來(lái)實(shí)現(xiàn)的,schema注冊(cè)表不屬于Kafka
?
轉(zhuǎn)載于:https://www.cnblogs.com/zy1234567/p/10339868.html
總結(jié)
以上是生活随笔為你收集整理的kafka笔记3(生产者)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 无服务器计算的黑暗面:程序移植没那么容易
- 下一篇: 8、SpringBoot-CRUD默认访