日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka笔记3(生产者)

發(fā)布時(shí)間:2025/4/5 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka笔记3(生产者) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?

創(chuàng)建Kafka生產(chǎn)者:

  Kafka生產(chǎn)者有3個(gè)必選屬性:

    bootstrap.servers ? broker地址清單,格式為host:port ? ,清單中不必包含所有broker,但至少2個(gè)

    key.serializer? = org.apache.kafka.common.serialization.Serializer 接口類,生產(chǎn)者使用這個(gè)類把鍵對(duì)象序列化為字節(jié)數(shù)組

        Kafka還提供了ByteArraySerializer,StringSerializer,IntegerSerializer 實(shí)現(xiàn)類

    value.serializer? 與key.serializer可以將值序列化

  

發(fā)送消息有3種方式:

  發(fā)送并忘記(fire-and-forget)? 發(fā)送消息給服務(wù)器,但是不關(guān)心是否正常到達(dá)

  同步發(fā)送? 使用send()發(fā)送信息,返回一個(gè)future對(duì)象,調(diào)用get()方法進(jìn)行等待,了解信息是否正常發(fā)送

  異步發(fā)送 ?? 調(diào)用send()函數(shù),并指定一個(gè)回調(diào)函數(shù),服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)

?

生產(chǎn)者的可選屬性:

  acks 指定有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫入是成功的

    ack=0? 生產(chǎn)者在成功寫入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)

    ack=1 只要集群的首領(lǐng)節(jié)點(diǎn)收到信息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)

    ack=all 所有參數(shù)與復(fù)制的節(jié)點(diǎn)全部收到消息,生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的響應(yīng),這種模式最安全

  buffer.memory? 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)大小,生產(chǎn)者使用它緩沖要發(fā)送到服務(wù)器的消息

  compression.type? 默認(rèn)消息發(fā)送不使用壓縮,該參數(shù)可以設(shè)置為snappy,gzip,lz4

  retries 生產(chǎn)者可以重發(fā)消息的次數(shù),默認(rèn) 每次重試之間等待100ms,可以使用參數(shù) retry.backoff.ms參數(shù)改變這個(gè)時(shí)間間隔

  batch.size 指定一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算

  linger.ms 指定生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。Kafkaproducer 會(huì)在批次填滿或linger.ms達(dá)到上限時(shí)把批次發(fā)送出去

  client.id 該參數(shù)可以指定任意的字符串,服務(wù)器會(huì)用它識(shí)別信息的來(lái)源,還可以用在日志和配額指標(biāo)里

  max.in.flight.requests.per.connection? 指定生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息,值越高越占用內(nèi)存;設(shè)為1可以保證消息是按照發(fā)送順序?qū)懭敕?wù)器的

  timeout.ms ? reuqest.timeout.ms ? metadata.fetch.timeout.ms?

    request.timeout.ms 指定生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間

    metadata.timeout.ms 指定生產(chǎn)者在獲取元數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)時(shí)間

    tiemout.ms指定broker等待同步副本返回消息確認(rèn)的時(shí)間,與acks的配置相匹配

  max.block.ms? 指定調(diào)用send()或partitionsFor()方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間,當(dāng)阻塞時(shí)間到達(dá)max.block.ms時(shí),生產(chǎn)者會(huì)拋出異常

  max.request.size 控制生產(chǎn)者發(fā)送的請(qǐng)求大小,指能發(fā)送的單個(gè)消息的最大值或單個(gè)請(qǐng)求里所有消息的總和

      broker對(duì)可接收的消息最大值也有自己的限制(message.max.bytes),兩邊配置最好匹配,避免生產(chǎn)者發(fā)送消息被拒絕

  receive.buffer.bytes 和 send.buffer.bytes

      這2個(gè)參數(shù)分別指定了TCP socket接收和發(fā)送數(shù)據(jù)包緩沖區(qū)大小,默認(rèn)-1

  max.in.flight.requests.per.connection=1 保證了消息的順序,如果大于1,第一批次寫入失敗后,重試成功可能會(huì)改變消息的順序

?

序列化器:

  自定義序列化器

  Avro序列化

    Kafka使用Avro序列器是通過(guò)schema注冊(cè)表來(lái)實(shí)現(xiàn)的,schema注冊(cè)表不屬于Kafka

    

?

  

轉(zhuǎn)載于:https://www.cnblogs.com/zy1234567/p/10339868.html

總結(jié)

以上是生活随笔為你收集整理的kafka笔记3(生产者)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。