kafka源码分析之一server启动分析
0. 關(guān)鍵概念
關(guān)鍵概念
| Topic | 用于劃分Message的邏輯概念,一個(gè)Topic可以分布在多個(gè)Broker上。 |
| Partition | 是Kafka中橫向擴(kuò)展和一切并行化的基礎(chǔ),每個(gè)Topic都至少被切分為1個(gè)Partition。 |
| Offset | 消息在Partition中的編號(hào),編號(hào)順序不跨Partition(在Partition內(nèi)有序)。 |
| Consumer | 用于從Broker中取出/消費(fèi)Message。 |
| Producer | 用于往Broker中發(fā)送/生產(chǎn)Message。 |
| Replication | Kafka支持以Partition為單位對(duì)Message進(jìn)行冗余備份,每個(gè)Partition都可以配置至少1個(gè)Replication(當(dāng)僅1個(gè)Replication時(shí)即僅該P(yáng)artition本身)。 |
| Leader | 每個(gè)Replication集合中的Partition都會(huì)選出一個(gè)唯一的Leader,所有的讀寫(xiě)請(qǐng)求都由Leader處理。其他Replicas從Leader處把數(shù)據(jù)更新同步到本地。 |
| Broker | Kafka中使用Broker來(lái)接受Producer和Consumer的請(qǐng)求,并把Message持久化到本地磁盤(pán)。每個(gè)Cluster當(dāng)中會(huì)選舉出一個(gè)Broker來(lái)?yè)?dān)任Controller,負(fù)責(zé)處理Partition的Leader選舉,協(xié)調(diào)Partition遷移等工作。 |
| ISR | In-Sync Replica,是Replicas的一個(gè)子集,表示目前Alive且與Leader能夠“Catch-up”的Replicas集合。由于讀寫(xiě)都是首先落到Leader上,所以一般來(lái)說(shuō)通過(guò)同步機(jī)制從Leader上拉取數(shù)據(jù)的Replica都會(huì)和Leader有一些延遲(包括了延遲時(shí)間和延遲條數(shù)兩個(gè)維度),任意一個(gè)超過(guò)閾值都會(huì)把該Replica踢出ISR。每個(gè)Leader Partition都有它自己獨(dú)立的ISR。 |
1. 分析kafka源碼的目的
深入掌握kafka的內(nèi)部原理
深入掌握scala運(yùn)用
2. server的啟動(dòng)
如下所示(本來(lái)準(zhǔn)備用時(shí)序圖的,但感覺(jué)時(shí)序圖沒(méi)有思維圖更能反映,故采用了思維圖):
2.1 啟動(dòng)入口Kafka.scala
從上面的思維導(dǎo)圖,可以看到Kafka的啟動(dòng)入口是Kafka.scala的main()函數(shù):
def main(args: Array[String]): Unit = {try {val serverProps = getPropsFromArgs(args)val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread() {override def run() = {kafkaServerStartable.shutdown}})kafkaServerStartable.startupkafkaServerStartable.awaitShutdown}catch {case e: Throwable =>fatal(e)System.exit(1)}System.exit(0)}上面代碼主要包含:
從配置文件讀取kafka服務(wù)器啟動(dòng)參數(shù)的getPropsFromArgs()方法;
創(chuàng)建KafkaServerStartable對(duì)象;
KafkaServerStartable對(duì)象在增加shutdown句柄函數(shù);
啟動(dòng)KafkaServerStartable的starup()方法;
啟動(dòng)KafkaServerStartable的awaitShutdown()方法;
2.2?KafkaServer的包裝類(lèi)KafkaServerStartable
private val server = new KafkaServer(serverConfig)def startup() {try {server.startup()}catch {case e: Throwable =>fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)// KafkaServer already calls shutdown() internally, so this is purely for logging & the exit codeSystem.exit(1)}}2.3 具體啟動(dòng)類(lèi)KafkaServer
KafkaServer啟動(dòng)的代碼層次比較清晰,加上注釋,看懂基本沒(méi)有問(wèn)題:
/*** Start up API for bringing up a single instance of the Kafka server.* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers*/def startup() {try {info("starting")if(isShuttingDown.get)throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")if(startupComplete.get)returnval canStartup = isStartingUp.compareAndSet(false, true)if (canStartup) {metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)brokerState.newState(Starting)/* start scheduler */kafkaScheduler.startup()/* setup zookeeper */zkUtils = initZk()/* start log manager */logManager = createLogManager(zkUtils.zkClient, brokerState)logManager.startup()/* generate brokerId */config.brokerId = getBrokerIdthis.logIdent = "[Kafka Server " + config.brokerId + "], "socketServer = new SocketServer(config, metrics, kafkaMetricsTime)socketServer.startup()/* start replica manager */replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,isShuttingDown) replicaManager.startup()/* start kafka controller */kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)kafkaController.startup()/* start kafka coordinator */consumerCoordinator = GroupCoordinator.create(config, zkUtils, replicaManager)consumerCoordinator.startup()/* Get the authorizer and initialize it if one is specified.*/authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)authZ.configure(config.originals())authZ}/* start processing requests */apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)brokerState.newState(RunningAsBroker)Mx4jLoader.maybeLoad()/* start dynamic config manager */dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager),ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides// TODO: Move this logic to DynamicConfigManager AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)}// Create the config manager. start listening to notificationsdynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup()/* tell everyone we are alive */val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>if (endpoint.port == 0)(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))else(protocol, endpoint)}kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils)kafkaHealthcheck.startup()/* register broker metrics */registerStats()shutdownLatch = new CountDownLatch(1)startupComplete.set(true)isStartingUp.set(false)AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)info("started")}}catch {case e: Throwable =>fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)isStartingUp.set(false)shutdown()throw e}}2.3.1?KafkaScheduler
KafkaScheduler是一個(gè)基于java.util.concurrent.ScheduledThreadPoolExecutor的scheduler,它內(nèi)部是以前綴kafka-scheduler-xx的線(xiàn)程池處理真正的工作。
注意xx是線(xiàn)程序列號(hào)。
/*** A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor* * It has a pool of kafka-scheduler- threads that do the actual work.* * @param threads The number of threads in the thread pool* @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.* @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.*/ @threadsafe class KafkaScheduler(val threads: Int, val threadNamePrefix: String = "kafka-scheduler-", daemon: Boolean = true) extends Scheduler with Logging {private var executor: ScheduledThreadPoolExecutor = nullprivate val schedulerThreadId = new AtomicInteger(0)override def startup() {debug("Initializing task scheduler.")this synchronized {if(isStarted)throw new IllegalStateException("This scheduler has already been started!")executor = new ScheduledThreadPoolExecutor(threads)executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)executor.setThreadFactory(new ThreadFactory() {def newThread(runnable: Runnable): Thread = Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)})}}2.3.2 zk初始化
zk初始化主要完成兩件事情:
val zkUtils = ZkUtils(config.zkConnect,config.zkSessionTimeoutMs,config.zkConnectionTimeoutMs,secureAclsEnabled)zkUtils.setupCommonPaths()一個(gè)是連接到zk服務(wù)器;二是創(chuàng)建通用節(jié)點(diǎn)。
通用節(jié)點(diǎn)包括:
// These are persistent ZK paths that should exist on kafka broker startup.val persistentZkPaths = Seq(ConsumersPath,BrokerIdsPath,BrokerTopicsPath,EntityConfigChangesPath,getEntityConfigRootPath(ConfigType.Topic),getEntityConfigRootPath(ConfigType.Client),DeleteTopicsPath,BrokerSequenceIdPath,IsrChangeNotificationPath)2.3.3 日志管理器LogManager
LogManager是kafka的子系統(tǒng),負(fù)責(zé)log的創(chuàng)建,檢索及清理。所有的讀寫(xiě)操作由單個(gè)的日志實(shí)例來(lái)代理。
/*** Start the background threads to flush logs and do log cleanup*/def startup() {/* Schedule the cleanup task to delete old logs */if(scheduler != null) {info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))scheduler.schedule("kafka-log-retention", cleanupLogs, delay = InitialTaskDelayMs, period = retentionCheckMs, TimeUnit.MILLISECONDS)info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))scheduler.schedule("kafka-log-flusher", flushDirtyLogs, delay = InitialTaskDelayMs, period = flushCheckMs, TimeUnit.MILLISECONDS)scheduler.schedule("kafka-recovery-point-checkpoint",checkpointRecoveryPointOffsets,delay = InitialTaskDelayMs,period = flushCheckpointMs,TimeUnit.MILLISECONDS)}if(cleanerConfig.enableCleaner) cleaner.startup()}2.3.4?SocketServer
SocketServer是nio的socket服務(wù)器,線(xiàn)程模型是:1個(gè)Acceptor線(xiàn)程處理新連接,Acceptor還有多個(gè)處理器線(xiàn)程,每個(gè)處理器線(xiàn)程擁有自己的selector和多個(gè)讀socket請(qǐng)求Handler線(xiàn)程。handler線(xiàn)程處理請(qǐng)求并產(chǎn)生響應(yīng)寫(xiě)給處理器線(xiàn)程。
/*** Start the socket server*/def startup() {this.synchronized {connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)val sendBufferSize = config.socketSendBufferBytesval recvBufferSize = config.socketReceiveBufferBytesval maxRequestSize = config.socketRequestMaxBytesval connectionsMaxIdleMs = config.connectionsMaxIdleMsval brokerId = config.brokerIdvar processorBeginIndex = 0endpoints.values.foreach { endpoint =>val protocol = endpoint.protocolTypeval processorEndIndex = processorBeginIndex + numProcessorThreadsfor (i <- processorBeginIndex until processorEndIndex) {processors(i) = new Processor(i,time,maxRequestSize,requestChannel,connectionQuotas,connectionsMaxIdleMs,protocol,config.values,metrics)}val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)acceptors.put(endpoint, acceptor)Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()acceptor.awaitStartup()processorBeginIndex = processorEndIndex}}newGauge("NetworkProcessorAvgIdlePercent",new Gauge[Double] {def value = allMetricNames.map( metricName =>metrics.metrics().get(metricName).value()).sum / totalProcessorThreads})info("Started " + acceptors.size + " acceptor threads")}?2.3.5 復(fù)制管理器
啟動(dòng)ISR過(guò)期線(xiàn)程
def startup() {// start ISR expiration threadscheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)}2.3.6 kafka控制器
當(dāng)kafka 服務(wù)器的控制器模塊啟動(dòng)時(shí)激活,但并不認(rèn)為當(dāng)前的代理就是控制器。它僅僅注冊(cè)了session過(guò)期監(jiān)聽(tīng)器和啟動(dòng)控制器選主。
def startup() = {inLock(controllerContext.controllerLock) {info("Controller starting up")registerSessionExpirationListener()isRunning = truecontrollerElector.startupinfo("Controller startup complete")}}session過(guò)期監(jiān)聽(tīng)器注冊(cè):
private def registerSessionExpirationListener() = {zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())}public void subscribeStateChanges(final IZkStateListener listener) {synchronized (_stateListener) {_stateListener.add(listener);}} class SessionExpirationListener() extends IZkStateListener with Logging {
this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
選主過(guò)程:
def startup {inLock(controllerContext.controllerLock) {controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)elect}}def elect: Boolean = {val timestamp = SystemTime.milliseconds.toStringval electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))leaderId = getControllerID /* * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, * it's possible that the controller has already been elected when we get here. This check will prevent the following * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.*/if(leaderId != -1) {debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))return amILeader}try {val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,electString,controllerContext.zkUtils.zkConnection.getZookeeper,JaasUtils.isZkSecurityEnabled())zkCheckedEphemeral.create()info(brokerId + " successfully elected as leader")leaderId = brokerIdonBecomingLeader()} catch {case e: ZkNodeExistsException =>// If someone else has written the path, thenleaderId = getControllerID if (leaderId != -1)debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))elsewarn("A leader has been elected but just resigned, this will result in another round of election")case e2: Throwable =>error("Error while electing or becoming leader on broker %d".format(brokerId), e2)resign()}amILeader}def amILeader : Boolean = leaderId == brokerId2.3.7?GroupCoordinator
GroupCoordinator處理組成員管理和offset管理,每個(gè)kafka服務(wù)器初始化一個(gè)協(xié)作器來(lái)負(fù)責(zé)一系列組別。每組基于它們的組名來(lái)賦予協(xié)作器。
def startup() {info("Starting up.")heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)isActive.set(true)info("Startup complete.")}注意:若同時(shí)需要一個(gè)組鎖和元數(shù)據(jù)鎖,請(qǐng)務(wù)必保證先獲取組鎖,然后獲取元數(shù)據(jù)鎖來(lái)防止死鎖。
2.3.8 KafkaApis消息處理接口
/*** Top-level method that handles all requests and multiplexes to the right api*/def handle(request: RequestChannel.Request) {try{trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal))request.requestId match {case RequestKeys.ProduceKey => handleProducerRequest(request)case RequestKeys.FetchKey => handleFetchRequest(request)case RequestKeys.OffsetsKey => handleOffsetRequest(request)case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)case requestId => throw new KafkaException("Unknown api code " + requestId)}} catch {case e: Throwable =>if ( request.requestObj != null)request.requestObj.handleError(e, requestChannel, request)else {val response = request.body.getErrorResponse(request.header.apiVersion, e)val respHeader = new ResponseHeader(request.header.correlationId)/* If request doesn't have a default error response, we just close the connection.For example, when produce request has acks set to 0 */if (response == null)requestChannel.closeConnection(request.processor, request)elserequestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response)))}error("error when handling request %s".format(request.requestObj), e)} finallyrequest.apiLocalCompleteTimeMs = SystemTime.milliseconds}我們以處理消費(fèi)者請(qǐng)求為例:
/*** Handle a produce request*/def handleProducerRequest(request: RequestChannel.Request) {val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]val numBytesAppended = produceRequest.sizeInBytesval (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition {case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic))}// the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))var errorInResponse = falsemergedResponseStatus.foreach { case (topicAndPartition, status) =>if (status.error != ErrorMapping.NoError) {errorInResponse = truedebug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(produceRequest.correlationId,produceRequest.clientId,topicAndPartition,ErrorMapping.exceptionNameFor(status.error)))}}def produceResponseCallback(delayTimeMs: Int) {if (produceRequest.requiredAcks == 0) {// no operation needed if producer request.required.acks = 0; however, if there is any error in handling// the request, since no response is expected by the producer, the server will close socket server so that// the producer client will know that some error has happened and will refresh its metadataif (errorInResponse) {val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)}.mkString(", ")info(s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +s"from client id ${produceRequest.clientId} with ack=0\n" +s"Topic and partition to exceptions: $exceptionsSummary")requestChannel.closeConnection(request.processor, request)} else {requestChannel.noOperation(request.processor, request)}} else {val response = ProducerResponse(produceRequest.correlationId,mergedResponseStatus,produceRequest.versionId,delayTimeMs)requestChannel.sendResponse(new RequestChannel.Response(request,new RequestOrResponseSend(request.connectionId,response)))}}// When this callback is triggered, the remote API call has completedrequest.apiRemoteCompleteTimeMs = SystemTime.millisecondsquotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId,numBytesAppended,produceResponseCallback)}if (authorizedRequestInfo.isEmpty)sendResponseCallback(Map.empty)else {val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId// call the replica manager to append messages to the replicas replicaManager.appendMessages(produceRequest.ackTimeoutMs.toLong,produceRequest.requiredAcks,internalTopicsAllowed,authorizedRequestInfo,sendResponseCallback)// if the request is put into the purgatory, it will have a held reference// and hence cannot be garbage collected; hence we clear its data here in// order to let GC re-claim its memory since it is already appended to log produceRequest.emptyData()}}對(duì)應(yīng)kafka producer的acks配置:
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.?2.3.9 動(dòng)態(tài)配置管理DynamicConfigManager
利用zookeeper做動(dòng)態(tài)配置中心
/*** Begin watching for config changes*/def startup() {zkUtils.makeSurePersistentPathExists(ZkUtils.EntityConfigChangesPath)zkUtils.zkClient.subscribeChildChanges(ZkUtils.EntityConfigChangesPath, ConfigChangeListener)processAllConfigChanges()}/*** Process all config changes*/private def processAllConfigChanges() {val configChanges = zkUtils.zkClient.getChildren(ZkUtils.EntityConfigChangesPath)import JavaConversions._processConfigChanges((configChanges: mutable.Buffer[String]).sorted)}/*** Process the given list of config changes*/private def processConfigChanges(notifications: Seq[String]) {if (notifications.size > 0) {info("Processing config change notification(s)...")val now = time.millisecondsfor (notification <- notifications) {val changeId = changeNumber(notification)if (changeId > lastExecutedChange) {val changeZnode = ZkUtils.EntityConfigChangesPath + "/" + notificationval (jsonOpt, stat) = zkUtils.readDataMaybeNull(changeZnode)processNotification(jsonOpt)}lastExecutedChange = changeId}purgeObsoleteNotifications(now, notifications)}}2.3.10 心跳檢測(cè)KafkaHealthcheck
心跳檢測(cè)也使用zookeeper維持:
def startup() {zkUtils.zkClient.subscribeStateChanges(sessionExpireListener)register()}/*** Register this broker as "alive" in zookeeper*/def register() {val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toIntval updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>if (endpoint.host == null || endpoint.host.trim.isEmpty)EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType)elseendpoint)// the default host and port are here for compatibility with older client// only PLAINTEXT is supported as default// if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connectval plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)}?
3. 小結(jié)
kafka中KafkaServer類(lèi),采用門(mén)面模式,是網(wǎng)絡(luò)處理,io處理等得入口.
ReplicaManager ? ?副本管理
KafkaApis ? ?處理所有request的Proxy類(lèi),根據(jù)requestKey決定調(diào)?用具體的handler
KafkaRequestHandlerPool?處理request的線(xiàn)程池,請(qǐng)求處理池 ?<--?num.io.threads io線(xiàn)程數(shù)量
LogManager ? ?kafka文件存儲(chǔ)系統(tǒng)管理,負(fù)責(zé)處理和存儲(chǔ)所有Kafka的topic的partiton數(shù)據(jù)
TopicConfigManager ?監(jiān)聽(tīng)此zk節(jié)點(diǎn)的?子節(jié)點(diǎn)/config/changes/,通過(guò)LogManager更新topic的配置信息,topic粒度配置管理,具體請(qǐng)查看topic級(jí)別配置
KafkaHealthcheck 監(jiān)聽(tīng)zk session expire,在zk上創(chuàng)建broker信息,便于其他broker和consumer獲取其信息
KafkaController ?kafka集群中央控制器選舉,leader選舉,副本分配。
KafkaScheduler ?負(fù)責(zé)副本管理和日志管理調(diào)度等等
ZkClient ? ? ? ? 負(fù)責(zé)注冊(cè)zk相關(guān)信息.
BrokerTopicStats ?topic信息統(tǒng)計(jì)和監(jiān)控
ControllerStats ? ? ? ? ?中央控制器統(tǒng)計(jì)和監(jiān)控
?
參考文獻(xiàn)
【1】https://zqhxuyuan1.gitbooks.io/kafka/content/chapter1-intro.html
【2】http://blog.csdn.net/lizhitao/article/details/37911993
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/5173486.html
總結(jié)
以上是生活随笔為你收集整理的kafka源码分析之一server启动分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: An In-Depth Look at
- 下一篇: gcview使用