Apache Kafka源码剖析:第5篇 业务API处理
生活随笔
收集整理的這篇文章主要介紹了
Apache Kafka源码剖析:第5篇 业务API处理
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
之前說過了,請求到達業務線程池后,會被處理,但是如何被處理呢?這就是接下來要說的。
-----------------------------------------------------------------------------------------------
業務線程屬于 Kafka的API層,對請求的處理通過調用KafkaAPIs中的方法實現!
1 KafkaRequestHandler
首先我們得知道這個業務線程池是怎么創建的
回到KafkaServer.scala
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads) class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,time: Time,numThreads: Int) extends Logging with KafkaMetricsGroup {/* a meter to track the average free capacity of the request handlers */private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "val runnables = new Array[KafkaRequestHandler](numThreads)for(i <- 0 until numThreads) {//創建這么多個runnable,放到線程里執行runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()}取出請求執行的代碼
def run() {while (true) {try {var req : RequestChannel.Request = nullwhile (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.// Since meter is calculated as total_recorded_value / time_window and// time_window is independent of the number of threads, each recorded idle// time should be discounted by # threads.val startSelectTime = time.nanosecondsreq = requestChannel.receiveRequest(300)val endTime = time.nanosecondsif (req != null)req.requestDequeueTimeNanos = endTimeval idleTime = endTime - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}if (req eq RequestChannel.AllDone) {debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))latch.countDown()return}trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))apis.handle(req)} catch {case e: FatalExitError =>latch.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)}}}這樣就不難理解了吧,
可見,API層使用kafkaRequestHandlerPool來管理所有的KafkaRequestHandler線程,它是1個簡易版的線程池,其中創建了多個KafkaRequestHandler線程。
?
KafkaApis
是Kafka服務器處理請求的入口類,負責將KafkaRequestHandler.Request分發到不同的handle*()方法里執行,見圖:
因為函數太多,這里就不展開,后面碰到的時候再詳細展開!
?
轉載于:https://my.oschina.net/qiangzigege/blog/1507362
總結
以上是生活随笔為你收集整理的Apache Kafka源码剖析:第5篇 业务API处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2017年本博客知识体系引导(更新至20
- 下一篇: hdu 6034 B - Balala