日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

集群管理工具KafkaAdminClient——原理与示例

發布時間:2024/4/11 编程问答 54 豆豆
生活随笔 收集整理的這篇文章主要介紹了 集群管理工具KafkaAdminClient——原理与示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/


前言

一般情況下,我們都習慣使用Kafka中bin目錄下的腳本工具來管理查看Kafka,但是有些時候需要將某些管理查看的功能集成到系統(比如Kafka Manager)中,那么就需要調用一些API來直接操作Kafka了。在Kafka0.11.0.0版本之前,可以通過kafka-core包(Kafka的服務端代碼,采用Scala編寫)下的AdminClient和AdminUtils來實現部分的集群管理操作,比如筆者之前在Kafka解析之topic創建(1)和Kafka解析之topic創建(2)兩篇文章中所講解的Topic的創建就用到了AdminUtils類。而在Kafka0.11.0.0版本之后,又多了一個AdminClient,這個是在kafka-client包下的,這是一個抽象類,具體的實現是org.apache.kafka.clients.admin.KafkaAdminClient,這個就是本文所要陳述的重點了。

功能與原理介紹

在Kafka官網中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):

  • 創建Topic:createTopics(Collection<NewTopic> newTopics)
  • 刪除Topic:deleteTopics(Collection<String> topics)
  • 羅列所有Topic:listTopics()
  • 查詢Topic:describeTopics(Collection<String> topicNames)
  • 查詢集群信息:describeCluster()
  • 查詢ACL信息:describeAcls(AclBindingFilter filter)
  • 創建ACL信息:createAcls(Collection<AclBinding> acls)
  • 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  • 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  • 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  • 查詢節點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
  • 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  • 增加分區:createPartitions(Map<String, NewPartitions> newPartitions)
  • 其內部原理是使用Kafka自定義的一套二進制協議來實現,詳細可以參見Kafka協議。主要實現步驟:

  • 客戶端根據方法的調用創建相應的協議請求,比如創建Topic的createTopics方法,其內部就是發送CreateTopicRequest請求。
  • 客戶端發送請求至Kafka Broker。
  • Kafka Broker處理相應的請求并回執,比如與CreateTopicRequest對應的是CreateTopicResponse。
  • 客戶端接收相應的回執并進行解析處理。
    和協議有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。
  • 示例

    下面就以創建Topic來舉一個簡單的KafkaAdminClient的使用案例,【代碼清單1】:

    private static final String NEW_TOPIC = "topic-test2"; private static final String brokerUrl = "localhost:9092";private static AdminClient adminClient;@BeforeClass public static void beforeClass(){Properties properties = new Properties();properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);adminClient = AdminClient.create(properties); }@AfterClass public static void afterClass(){adminClient.close(); }@Test public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);adminClient.createTopics(newTopicList); }

    示例中的createTopics()方法就創建了一個分區數為4,副本因子為1的“topic-test2”的Topic。

    代碼剖析

    下面來詳細介紹一下KafkaAdminClient中現有的listTopics()方法(這個方法的實現相對干凈利落,代碼量少、易于講解)的實現方式,以便可以了解KafkaAdminClient中的大體脈絡。listTopics()方法的具體代碼如【代碼清單2】所示:

    public ListTopicsResult listTopics(final ListTopicsOptions options) {final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>();final long now = time.milliseconds();runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) {@OverrideAbstractRequest.Builder createRequest(int timeoutMs) {return MetadataRequest.Builder.allTopics();}@Overridevoid handleResponse(AbstractResponse abstractResponse) {MetadataResponse response = (MetadataResponse) abstractResponse;Cluster cluster = response.cluster();Map<String, TopicListing> topicListing = new HashMap<>();for (String topicName : cluster.topics()) {boolean internal = cluster.internalTopics().contains(topicName);if (!internal || options.shouldListInternal())topicListing.put(topicName, new TopicListing(topicName, internal));}topicListingFuture.complete(topicListing);}@Overridevoid handleFailure(Throwable throwable) {topicListingFuture.completeExceptionally(throwable);}}, now);return new ListTopicsResult(topicListingFuture); }

    listTopics()方法接收一個ListTopicsOptions類型的參數,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXOptions類型的參數,這個類型一般只包含timeoutMs這個成員變量,用來設定請求的超時時間,如果沒有指定則使用默認的request.timeout.ms參數值,即30000ms。就拿查詢Topic信息所對應的DescribeTopicsOptions來說,其就包含一個timeoutMs參數,具體如【代碼清單3】所示:

    public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {} public abstract class AbstractOptions<T extends AbstractOptions> {private Integer timeoutMs = null;@SuppressWarnings("unchecked")public T timeoutMs(Integer timeoutMs) {this.timeoutMs = timeoutMs;return (T) this;}public Integer timeoutMs() {return timeoutMs;} }

    不過ListTopicsOptions擴展了一個成員變量listInternal,用來指明是否需要羅列內部Topic,比如在Kafka解析之topic創建(1)中提及的“__consumer_offsets”和“transaction_state”就是兩個內部Topic。ListTopicsOptions的代碼如【代碼清單4】所示:

    public class ListTopicsOptions extends AbstractOptions<ListTopicsOptions> {private boolean listInternal = false;public ListTopicsOptions listInternal(boolean listInternal) {this.listInternal = listInternal;return this;}public boolean shouldListInternal() {return listInternal;} }

    listInternal的值默認為false,如果同時要羅列出目前的內部Topic的話就需要將這個listInternal設置為true,示例代碼如【代碼清單5】所示:

    @Test public void listTopicsIncludeInternal() throws ExecutionException, InterruptedException {ListTopicsOptions listTopicsOptions = new ListTopicsOptions();listTopicsOptions.listInternal(true);ListTopicsResult result = adminClient.listTopics(listTopicsOptions);Collection<TopicListing> list = result.listings().get();System.out.println(list); }

    接下去繼續講解listTopics()方法,其返回值為ListTopicResult類型。與ListTopicsOptions對應,KafkaAdminClient中基本所有的應用類方法都有一個類似XXXResult類型的返回值,其內部一般包含一個KafkaFuture,用于異步發送請求之后等待操作結果。KafkaFuture實現了Java中的Future接口,用來支持鏈式調用以及其他異步編程模型,可以看成是Java8中CompletableFuture的一個小型版本,其中也有類似thenApply、complete、completeExceptionally的方法。

    再來看代碼清單2中的 runnable.call(new Call(“listTopics”, calcDeadlineMs(now, options.timeoutMs()),new LeastLoadedNodeProvider()) 這行代碼,runnable的類型是AdminClientRunnable,其是KafkaAdminClient負責處理與服務端交互請求的服務線程。AdminClientRunnable中的call方法用作入隊一個Call請求,進而對其處理。Call請求代表與服務端的一次請求交互,比如listTopics和createTopics都是一次Call請求,AdminClientRunnable線程負責處理這些Call請求。

    Call類是一個抽象類,構造方法接收三個參數:本次請求的名稱callName、超時時間deadlineMs、以及節點提供器nodeProvider。nodeProvider是NodeProvider類型,用來提供本次請求所交互的Broker節點。Call類中還有3個抽象方法:createRequest()、handleResponse()、handleFailure(),分別用來創建請求、處理回執和處理失敗。在代碼清單2中,對于listTopics()方法而言,其內部原理就是發送MetadataRequest請求然后處理MetadataResponse,其處理邏輯峰封裝在createRequest()、handleResponse()、handleFailure()這三個方法之中了。

    綜上,如果要自定義實現一個功能,只需要三個步驟:

  • 自定義XXXOptions;
  • 自定義XXXResult返回值;
  • 自定義Call,然后挑選合適的XXXRequest和XXXResponse來實現Call類中的3個抽象方法。
  • KafkaAdminClient目前而言尚未形成一個完全體,里面還可以擴展很多功能,就拿上一篇文章《如何獲取Kafka的消費者詳情——從Scala到Java的切換》中介紹的而言,目前KafkaAdminClient尚未實現describeConsumerGroup和listGroupOffsets的功能,所以需要進一步的升級改造。篇幅限制,這部分內容將在下一篇文章進行介紹,如果想要先睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….

    歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-admin-client-1-principles-and-demos/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的集群管理工具KafkaAdminClient——原理与示例的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。