日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka解析之topic创建(3)——合法性验证

發布時間:2024/4/11 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka解析之topic创建(3)——合法性验证 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/


前文摘要

在《Kafka解析之Topic創建(1)》這篇文章中,我們講述了創建Topic的方式有兩種:

  • 如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable參數為true(默認值就是true),那么當生產者向一個尚未創建的topic發送消息時,會自動創建一個num.partitions(默認值為1)個分區和default.replication.factor(默認值為1)個副本的對應topic。
  • 通過kafka提供的kafka-topics.sh腳本來創建,或者相關的變種方式(比如在zookeeper中的/config/topics/路徑下直接創建)。
  • 在學習了KafkaAdminClient之后我們發現它也可以用來創建Topic,即通過發送CreateTopicsRequest請求的方式來創建。KafkaAdminClient的詳細內容可以參考:《集群管理工具KafkaAdminClient——原理與示例》和《集群管理工具KafkaAdminClient——改造》。


    一般情況下,Kafka生產環境中的 auto.create.topics.enable參數會被修改為false,即自動創建Topic這條路會被堵住。kafka-topics.sh腳本創建的方式一般由運維人員操作,普通用戶無權過問。那么KafkaAdminClient就為普通用戶提供了一個口子,或者將其集成到公司內部的資源申請、審核系統中更加的方便。普通用戶在創建Topic的時候,有可能由于誤操作或者其他原因而創建了不符合運維規范的Topic,比如命名不規范,副本因子數太低等,這些都會影響后期的系統運維。如果創建Topic的操作是封裝在資源申請、審核系統中的話,那么可以在前端就可以根據規則過濾掉不符合規范的申請操作。然而如果用戶就是用了KafkaAdminClient或者類似的工具來創建了一個錯誤的Topic,我們有什么辦法可以做相應的規范處理呢?

    在Kafka服務端中提供了這樣一個參數:create.topic.policy.class.name,其提供了一個入口用來驗證Topic創建的合法性。使用方式是自定義實現org.apache.kafka.server.policy.CreateTopicPolicy接口,比如下面的PolicyDemo,然后在kafka broker中的config/server.properties配置文件中配置參數create.topic.policy.class.name=org.apache.kafka.server.policy.PolicyDemo,然后啟動Kafka服務即可。PolicyDemo的代碼參考如下,主要實現接口中的configure、close以及validate方法,configure方法會在Kafka服務啟動的時候執行,validate方法用來鑒定Topic參數的合法性,其在創建Topic的時候執行,close方法在關閉Kafka服務的時候執行。

    public class PolicyDemo implements CreateTopicPolicy{public void configure(Map<String, ?> configs) {}public void close() throws Exception {}public void validate(RequestMetadata requestMetadata)throws PolicyViolationException {if(requestMetadata.numPartitions()!=null || requestMetadata.replicationFactor()!=null){if(requestMetadata.numPartitions()< 5){throw new PolicyViolationException("Topic should have at least 5 partitions, received: "+ requestMetadata.numPartitions());}if(requestMetadata.replicationFactor()<= 1){throw new PolicyViolationException("Topic should have at least 2 replication factor, recevied: "+ requestMetadata.replicationFactor());}}}}

    采用文章《集群管理工具KafkaAdminClient——原理與示例》中的所提及的關于KafkaAdminClient來創建Topic,測試代碼如下,創建一個分區數為4,副本數為1的Topic:

    @Test public void createTopics() {NewTopic newTopic = new NewTopic(NEW_TOPIC,4, (short) 1);Collection<NewTopic> newTopicList = new ArrayList<>();newTopicList.add(newTopic);CreateTopicsResult result = adminClient.createTopics(newTopicList);try {result.all().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} }

    測試結果如期報錯:

    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4

    相應的Kafka服務端的日志如下:

    CreateTopicPolicy.RequestMetadata(topic=topic-test2, numPartitions=4, replicationFactor=1, replicasAssignments=null, configs={}) [2018-04-18 19:52:02,747] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic topic-test2 with arguments (numPartitions=4, replicationFactor=1, replicasAssignments={}, configs={}) (kafka.server.AdminManager) org.apache.kafka.common.errors.PolicyViolationException: Topic should have at least 5 partitions, received: 4

    客戶端向Kafka服務端發送了CreateTopicsRequest請求之后,會經過KafkaApis:

    case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)

    然后調用handleCreateTopicsRequest()方法,Topic最終在服務端的創建是在AdminManager中的createTopics方法中實現的。而CreateTopicPolicy的作用域也限定在這個createTopics方法之內,故只有通過CreateTopicsRequest請求的方式才能促使CreateTopicPolicy有效,而對于類似于kafka-topics.sh腳本的創建方式無效。不過在正文開頭就提及了在運維規范的情況下,一般是通過KafkaAdminClient進行操作,或者更加規范的話直接通過申請頁面來創建,這樣就可以在前端規避風險,這樣顯得更加的專業。

    歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/kafka-analysis-of-topic-creation-3-validity-verification/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

    總結

    以上是生活随笔為你收集整理的Kafka解析之topic创建(3)——合法性验证的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。