日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 服务端消费者和生产者的配置

發(fā)布時(shí)間:2023/12/9 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 服务端消费者和生产者的配置 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在kafka的安裝目錄下,config目錄下有個(gè)名字叫做producer.properties的配置文件

#指定kafka節(jié)點(diǎn)列表,用于獲取metadata,不必全部指定 #需要kafka的服務(wù)器地址,來獲取每一個(gè)topic的分片數(shù)等元數(shù)據(jù)信息。 metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092#生產(chǎn)者生產(chǎn)的消息被發(fā)送到哪個(gè)block,需要一個(gè)分組策略。 #指定分區(qū)處理類。默認(rèn)kafka.producer.DefaultPartitioner,表通過key哈希到對(duì)應(yīng)分區(qū) #partitioner.class=kafka.producer.DefaultPartitioner#生產(chǎn)者生產(chǎn)的消息可以通過一定的壓縮策略(或者說壓縮算法)來壓縮。消息被壓縮后發(fā)送到broker集群, #而broker集群是不會(huì)進(jìn)行解壓縮的,broker集群只會(huì)把消息發(fā)送到消費(fèi)者集群,然后由消費(fèi)者來解壓縮。 #是否壓縮,默認(rèn)0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。 #壓縮后消息中會(huì)有頭來指明消息壓縮類型,故在消費(fèi)者端消息解壓是透明的無需指定。 #文本數(shù)據(jù)會(huì)以1比10或者更高的壓縮比進(jìn)行壓縮。 compression.codec=none#指定序列化處理類,消息在網(wǎng)絡(luò)上傳輸就需要序列化,它有String、數(shù)組等許多種實(shí)現(xiàn)。 serializer.class=kafka.serializer.DefaultEncoder#如果要壓縮消息,這里指定哪些topic要壓縮消息,默認(rèn)empty,表示不壓縮。 #如果上面啟用了壓縮,那么這里就需要設(shè)置 #compressed.topics= #這是消息的確認(rèn)機(jī)制,默認(rèn)值是0。在面試中常被問到。 #producer有個(gè)ack參數(shù),有三個(gè)值,分別代表: #(1)不在乎是否寫入成功; #(2)寫入leader成功; #(3)寫入leader和所有副本都成功; #要求非常可靠的話可以犧牲性能設(shè)置成最后一種。 #為了保證消息不丟失,至少要設(shè)置為1,也就 #是說至少保證leader將消息保存成功。 #設(shè)置發(fā)送數(shù)據(jù)是否需要服務(wù)端的反饋,有三個(gè)值0,1,-1,分別代表3種狀態(tài): #0: producer不會(huì)等待broker發(fā)送ack。生產(chǎn)者只要把消息發(fā)送給broker之后,就認(rèn)為發(fā)送成功了,這是第1種情況; #1: 當(dāng)leader接收到消息之后發(fā)送ack。生產(chǎn)者把消息發(fā)送到broker之后,并且消息被寫入到本地文件,才認(rèn)為發(fā)送成功,這是第二種情況;#-1: 當(dāng)所有的follower都同步消息成功后發(fā)送ack。不僅是主的分區(qū)將消息保存成功了, #而且其所有的分區(qū)的副本數(shù)也都同步好了,才會(huì)被認(rèn)為發(fā)動(dòng)成功,這是第3種情況。 request.required.acks=0#broker必須在該時(shí)間范圍之內(nèi)給出反饋,否則失敗。 #在向producer發(fā)送ack之前,broker允許等待的最大時(shí)間 ,如果超時(shí), #broker將會(huì)向producer發(fā)送一個(gè)error ACK.意味著上一次消息因?yàn)槟撤N原因 #未能成功(比如follower未能同步成功) request.timeout.ms=10000#生產(chǎn)者將消息發(fā)送到broker,有兩種方式,一種是同步,表示生產(chǎn)者發(fā)送一條,broker就接收一條; #還有一種是異步,表示生產(chǎn)者積累到一批的消息,裝到一個(gè)池子里面緩存起來,再發(fā)送給broker, #這個(gè)池子不會(huì)無限緩存消息,在下面,它分別有一個(gè)時(shí)間限制(時(shí)間閾值)和一個(gè)數(shù)量限制(數(shù)量閾值)的參數(shù)供我們來設(shè)置。 #一般我們會(huì)選擇異步。 #同步還是異步發(fā)送消息,默認(rèn)“sync”表同步,"async"表異步。異步可以提高發(fā)送吞吐量, #也意味著消息將會(huì)在本地buffer中,并適時(shí)批量發(fā)送,但是也可能導(dǎo)致丟失未發(fā)送過去的消息 producer.type=sync#在async模式下,當(dāng)message被緩存的時(shí)間超過此值后,將會(huì)批量發(fā)送給broker, #默認(rèn)為5000ms #此值和batch.num.messages協(xié)同工作. queue.buffering.max.ms = 5000#異步情況下,緩存中允許存放消息數(shù)量的大小。 #在async模式下,producer端允許buffer的最大消息量 #無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積 #此時(shí),如果消息的條數(shù)達(dá)到閥值,將會(huì)導(dǎo)致producer端阻塞或者消息被拋棄,默認(rèn)為10000條消息。 queue.buffering.max.messages=20000#如果是異步,指定每次批量發(fā)送數(shù)據(jù)量,默認(rèn)為200 batch.num.messages=500#在生產(chǎn)端的緩沖池中,消息發(fā)送出去之后,在沒有收到確認(rèn)之前,該緩沖池中的消息是不能被刪除的, #但是生產(chǎn)者一直在生產(chǎn)消息,這個(gè)時(shí)候緩沖池可能會(huì)被撐爆,所以這就需要有一個(gè)處理的策略。 #有兩種處理方式,一種是讓生產(chǎn)者先別生產(chǎn)那么快,阻塞一下,等會(huì)再生產(chǎn);另一種是將緩沖池中的消息清空。 #當(dāng)消息在producer端沉積的條數(shù)達(dá)到"queue.buffering.max.meesages"后阻塞一定時(shí)間后, #隊(duì)列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息) #此時(shí)producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制"阻塞"的時(shí)間 #-1: 不限制阻塞超時(shí)時(shí)間,讓produce一直阻塞,這個(gè)時(shí)候消息就不會(huì)被拋棄 #0: 立即清空隊(duì)列,消息被拋棄 queue.enqueue.timeout.ms=-1#當(dāng)producer接收到error ACK,或者沒有接收到ACK時(shí),允許消息重發(fā)的次數(shù) #因?yàn)閎roker并沒有完整的機(jī)制來避免消息重復(fù),所以當(dāng)網(wǎng)絡(luò)異常時(shí)(比如ACK丟失) #有可能導(dǎo)致broker接收到重復(fù)的消息,默認(rèn)值為3. message.send.max.retries=3#producer刷新topic metada的時(shí)間間隔,producer需要知道partition leader #的位置,以及當(dāng)前topic的情況 #因此producer需要一個(gè)機(jī)制來獲取最新的metadata,當(dāng)producer遇到特定錯(cuò)誤時(shí), #將會(huì)立即刷新 #(比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置 #額外的刷新機(jī)制,默認(rèn)值600000 topic.metadata.refresh.interval.ms=60000

kafka的消費(fèi)者配置(路徑和生產(chǎn)者配置文件路徑相同),名字叫做.consumer.properties

#消費(fèi)者集群通過連接Zookeeper來找到broker。 #zookeeper連接服務(wù)器地址 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181#zookeeper的session過期時(shí)間,默認(rèn)5000ms,用于檢測消費(fèi)者是否掛掉 zookeeper.session.timeout.ms=5000#當(dāng)消費(fèi)者掛掉,其他消費(fèi)者要等該指定時(shí)間才能檢查到并且觸發(fā)重新負(fù)載均衡 zookeeper.connection.timeout.ms=10000#這是一個(gè)時(shí)間閾值。 #指定多久消費(fèi)者更新offset到zookeeper中。 #注意offset更新時(shí)基于time而不是每次獲得的消息。 #一旦在更新zookeeper發(fā)生異常并重啟,將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000#指定消費(fèi) group.id=xxxxx#這是一個(gè)數(shù)量閾值,經(jīng)測試是500條。 #當(dāng)consumer消費(fèi)一定量的消息之后,將會(huì)自動(dòng)向zookeeper提交offset信息#注意offset信息并不是每消費(fèi)一次消息就向zk提交 #一次,而是現(xiàn)在本地保存(內(nèi)存),并定期提交,默認(rèn)為true auto.commit.enable=true# 自動(dòng)更新時(shí)間。默認(rèn)60 * 1000 auto.commit.interval.ms=1000# 當(dāng)前consumer的標(biāo)識(shí),可以設(shè)定,也可以有系統(tǒng)生成, #主要用來跟蹤消息消費(fèi)情況,便于觀察 conusmer.id=xxx# 消費(fèi)者客戶端編號(hào),用于區(qū)分不同客戶端,默認(rèn)客戶端程序自動(dòng)產(chǎn)生 client.id=xxxx# 最大取多少塊緩存到消費(fèi)者(默認(rèn)10) queued.max.message.chunks=50# 當(dāng)有新的consumer加入到group時(shí),將會(huì)reblance,此后將會(huì) #有partitions的消費(fèi)端遷移到新 的consumer上,如果一個(gè) #consumer獲得了某個(gè)partition的消費(fèi)權(quán)限,那么它將會(huì)向zk #注冊(cè) "Partition Owner registry"節(jié)點(diǎn)信息,但是有可能 #此時(shí)舊的consumer尚沒有釋放此節(jié)點(diǎn), 此值用于控制, #注冊(cè)節(jié)點(diǎn)的重試次數(shù). rebalance.max.retries=5#每拉取一批消息的最大字節(jié)數(shù) #獲取消息的最大尺寸,broker不會(huì)像consumer輸出大于 #此值的消息chunk 每次feth將得到多條消息,此值為總大小, #提升此值,將會(huì)消耗更多的consumer端內(nèi)存 fetch.min.bytes=6553600#當(dāng)消息的尺寸不足時(shí),server阻塞的時(shí)間,如果超時(shí), #消息將立即發(fā)送給consumer #數(shù)據(jù)一批一批到達(dá),如果每一批是10條消息,如果某一批還 #不到10條,但是超時(shí)了,也會(huì)立即發(fā)送給consumer。 fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360# 如果zookeeper沒有offset值或offset值超出范圍。 #那么就給個(gè)初始的offset。有smallest、largest、 #anything可選,分別表示給當(dāng)前最小的offset、 #當(dāng)前最大的offset、拋異常。默認(rèn)largest auto.offset.reset=smallest# 指定序列化處理類 derializer.class=kafka.serializer.DefaultDecoder

以上內(nèi)容轉(zhuǎn)載自:https://www.cnblogs.com/jun1019/p/6256371.html

?

硬件的選擇

1.磁盤吞吐量

  生產(chǎn)者客戶端的性能直接受到服務(wù)器端磁盤吞吐量的影響。生產(chǎn)者生成的消息必須被提交到服務(wù)器保存,大多數(shù)客戶端在發(fā)送消息后會(huì)一直等待,直到至少有一個(gè)服務(wù)器確認(rèn)消息已經(jīng)提交成功為止。也就是說,磁盤寫入速度越快,生成消息的延遲就越低。機(jī)械硬盤(HDD)和固態(tài)盤(SSD)。固態(tài)盤的查找和訪問速度都很快,提供了最好的性能。機(jī)械盤更便宜,單塊容量也更大。在同一個(gè)服務(wù)器上使用多個(gè)機(jī)械盤,可以設(shè)置多個(gè)數(shù)據(jù)目錄,或者把它們?cè)O(shè)置成磁盤陣列,這樣可以提升機(jī)械硬盤的性能。

2.磁盤容量

  磁盤容量要多大取決于需要保存的消息的數(shù)量。

3.內(nèi)存

  服務(wù)器端的內(nèi)存容量是影響客戶端性能的主要因素,磁盤性能影響生產(chǎn)者,而內(nèi)存影響消費(fèi)者。消費(fèi)者一般從分區(qū)尾部讀取消息,如果有生產(chǎn)者存在,就緊跟在生產(chǎn)者后面。這種情況下,消費(fèi)者讀的消息會(huì)直接存放在系統(tǒng)的頁面緩存里,這比從磁盤上重新讀取要快。不建議把kafka同其他重要的應(yīng)用部署在一起,因?yàn)樗鼈冃枰蚕眄撁婢彺?#xff0c;最終會(huì)降低kafka消費(fèi)者的性能。

4.網(wǎng)絡(luò)

  網(wǎng)絡(luò)吞吐量決定了kafka能夠處理的最大數(shù)據(jù)流量。它和磁盤性能是制約kafka擴(kuò)展規(guī)模的主要因素。kafka支持多個(gè)消費(fèi)者,造成流入和流出的網(wǎng)絡(luò)流量不平衡,從而讓情況變得更加復(fù)雜。對(duì)于給定的主題,一個(gè)生產(chǎn)者可能每秒中寫入1MB數(shù)據(jù),但可能同時(shí)有多個(gè)消費(fèi)者瓜分網(wǎng)絡(luò)流量。其它操作也會(huì)占用網(wǎng)絡(luò)流量。

5.CPU

  kafka對(duì)計(jì)算處理能力的要求較低,不過他在一定程度上還是會(huì)影響整體性能??蛻舳藶榱藘?yōu)化網(wǎng)絡(luò)和磁盤空間,會(huì)對(duì)消息進(jìn)行壓縮。服務(wù)器需要對(duì)消息進(jìn)行批量解壓,設(shè)置偏移量,然后重新進(jìn)行批量壓縮,再保存到磁盤上。這就是kafka對(duì)計(jì)算能力有所要求的地方。

  使用kafka集群的最大好處就是可以跨服務(wù)器進(jìn)行負(fù)載均衡,再則就是可以使用復(fù)制功能來避免因單點(diǎn)故障造成的數(shù)據(jù)丟失。在維護(hù)kafka或底層系統(tǒng)時(shí),使用集群可以確保為客戶端提供高可用性。

?

轉(zhuǎn)載于:https://www.cnblogs.com/ToBeExpert/p/9823339.html

總結(jié)

以上是生活随笔為你收集整理的kafka 服务端消费者和生产者的配置的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。