日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka源码分析之二客户端分析

發布時間:2025/4/5 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka源码分析之二客户端分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

客戶端由兩種:生產者和消費者

1. 生產者

先看一下生產者的構造方法:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {try {log.trace("Starting the Kafka producer");Map<String, Object> userProvidedConfigs = config.originals();this.producerConfig = config;this.time = new SystemTime();MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)).timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),TimeUnit.MILLISECONDS);clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);if (clientId.length() <= 0)clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,MetricsReporter.class);reporters.add(new JmxReporter(JMX_PREFIX));this.metrics = new Metrics(metricConfig, reporters, time);this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG));this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));/* check for user defined settings.* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.* This should be removed with release 0.9 when the deprecated configs are removed.*/if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);if (blockOnBufferFull) {this.maxBlockTimeMs = Long.MAX_VALUE;} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);} else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);}} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);} else {this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);}/* check for user defined settings.* If the TIME_OUT config is set use that for request timeout.* This should be removed with release 0.9*/if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);} else {this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);}Map<String, String> metricTags = new LinkedHashMap<String, String>();metricTags.put("client-id", clientId);this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),this.totalMemorySize,this.compressionType,config.getLong(ProducerConfig.LINGER_MS_CONFIG),retryBackoffMs,metrics,time,metricTags);List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());NetworkClient client = new NetworkClient(new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),this.metadata,clientId,config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),this.requestTimeoutMs, time);this.sender = new Sender(client,this.metadata,this.accumulator,config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),config.getInt(ProducerConfig.RETRIES_CONFIG),this.metrics,new SystemTime(),clientId,this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");this.ioThread = new KafkaThread(ioThreadName, this.sender, true);this.ioThread.start();this.errors = this.metrics.sensor("errors");if (keySerializer == null) {this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,Serializer.class);this.keySerializer.configure(config.originals(), true);} else {config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);this.keySerializer = keySerializer;}if (valueSerializer == null) {this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,Serializer.class);this.valueSerializer.configure(config.originals(), false);} else {config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);this.valueSerializer = valueSerializer;}config.logUnused();AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);log.debug("Kafka producer started");} catch (Throwable t) {// call close methods if internal objects are already constructed// this is to prevent resource leak. see KAFKA-2121close(0, TimeUnit.MILLISECONDS, true);// now propagate the exceptionthrow new KafkaException("Failed to construct kafka producer", t);}}

很多代碼是讀取配置文件,但紅色部分才是主要:

調用Sender線程的run方法

/*** Run a single iteration of sending* * @param now* The current POSIX time in milliseconds*/public void run(long now) {Cluster cluster = metadata.fetch();// get the list of partitions with data ready to sendRecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);// if there are any partitions whose leaders are not known yet, force metadata updateif (result.unknownLeadersExist)this.metadata.requestUpdate();// remove any nodes we aren't ready to send toIterator<Node> iter = result.readyNodes.iterator();long notReadyTimeout = Long.MAX_VALUE;while (iter.hasNext()) {Node node = iter.next();if (!this.client.ready(node, now)) {iter.remove();notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));}}// create produce requestsMap<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,result.readyNodes,this.maxRequestSize,now);List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now);// update sensorsfor (RecordBatch expiredBatch : expiredBatches)this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);sensors.updateProduceRequestMetrics(batches);List<ClientRequest> requests = createProduceRequests(batches, now);// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}for (ClientRequest request : requests)client.send(request, now);// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;this.client.poll(pollTimeout, now);}

?調用NetworkClient的send方法

/*** Queue up the given request for sending. Requests can only be sent out to ready nodes.** @param request The request* @param now The current timestamp*/@Overridepublic void send(ClientRequest request, long now) {String nodeId = request.request().destination();if (!canSendRequest(nodeId))throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");doSend(request, now);}private void doSend(ClientRequest request, long now) {request.setSendTimeMs(now);this.inFlightRequests.add(request);selector.send(request.request());}

selector調用channel來發送:

/*** Queue the given request for sending in the subsequent {@poll(long)} calls* @param send The request to send*/public void send(Send send) {KafkaChannel channel = channelOrFail(send.destination());try {channel.setSend(send);} catch (CancelledKeyException e) {this.failedSends.add(send.destination());close(channel);}}

調用channel的send方法:

public void setSend(Send send) {if (this.send != null)throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");this.send = send;this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}

這里TransportLayer封裝了通信的細節

?

?

?

?

?

?

?

?

2. 消費者

轉載于:https://www.cnblogs.com/davidwang456/p/5189414.html

總結

以上是生活随笔為你收集整理的kafka源码分析之二客户端分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。