日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ:NameServer架构设计以及启动关闭流程源码分析

發布時間:2025/3/21 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ:NameServer架构设计以及启动关闭流程源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • NameServer
      • 1.架構設計
      • 2.核心類與配置
      • 3.啟動與關閉流程
        • 3.1.步驟一
        • 3.2.步驟二
        • 3.3.步驟三

NameServer

1.架構設計

消息中間件的設計思路一般都是基于主題訂閱與發布的機制,RocketMQ也不例外。RocketMQ中,消息生產者(Producer)發送某主題的消息到消息服務器,消息服務器對消息進行持久化存儲,而消息消費者(Consumer)訂閱所需要的主題,消息服務器根據訂閱信息(路由信息)將消息推送至消息消費者(Push模式)或者消息消費者主動向消息服務器進行拉取(Pull模式),從而實現消息生產者與消息消費者之間解耦。

為了避免消息服務器單點故障而導致的系統癱瘓,消息服務器常常會集群分布,部署多臺服務器共同處理消息并且承擔消息的存儲,消息生產者如何知道要將消息發送至哪臺服務器和消息消費者如何知道要從哪臺消息服務器進行消息的拉取等等問題,都要由NameServer來處理,其實NameServer充當的角色與Zookeeper十分相似。

Broker消息服務器啟動時,需要向NameServer集群進行信息注冊,消息生產者Producer發送消息之前主動向NameServer獲取Broker服務器地址列表,然后根據負載均衡算法從列表中選出一臺服務器進行消息的發送。NameServer與每臺Broker保持長連接,并每隔30s對Broker存活狀態進行檢測,如果檢測到Broker宕機并且長時間沒有進行連接重試,則會將該Broker從路由注冊表中刪除,以此保證Broker集群的高可用,但是路由變化不會立馬對生產者進行通知,需要Producer一段時間之后重新向NameServer進行獲取并更新路由信息。這也是NameServer與Zookeeper的不同,NameServer這樣的設計降低了整個NameServer實現的復雜度,整個NameServer代碼實現不超過一千行,簡單而高效!

以下是NameServer整個項目預覽:

可以看到NameServer主要有以下幾個作用:

  • 配置信息管理
  • 請求處理
  • 路由信息管理

2.核心類與配置

NamesrvController

NameserController 是 NameServer 模塊的核心控制類。

private final NamesrvConfig namesrvConfig;//主要指定 nameserver 的相關配置屬性 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));//NameServer定時任務執行線程池-->每隔10s掃描broker,對存活的Broker信息進行維護并且打印KVConfig private final KVConfigManager kvConfigManager;//讀取或變更NameServer的配置屬性,加載 NamesrvConfig中配置到內存 private final RouteInfoManager routeInfoManager;//NameServer 數據的載體,記錄 Broker、Topic 等信息。private final NettyServerConfig nettyServerConfig;//與網絡通訊相關的配置 private RemotingServer remotingServer;//網絡通信服務 private ExecutorService remotingExecutor;//網絡通信服務

NamesrvConfig

private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json"; private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties"; private String productEnvName = "center"; private boolean clusterTest = false; private boolean orderMessageEnable = false;

rocketmqHome:rocketmq主目錄

kvConfigPath:NameServer存儲KV配置屬性的持久化路徑

configStorePath:nameServer默認配置文件路徑

orderMessageEnable:是否支持順序消息


NettyServerConfig

private int listenPort = 8888; private int serverWorkerThreads = 8; private int serverCallbackExecutorThreads = 0; private int serverSelectorThreads = 3; private int serverOnewaySemaphoreValue = 256; private int serverAsyncSemaphoreValue = 64; private int serverChannelMaxIdleTimeSeconds = 120;private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize; private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize; private boolean serverPooledByteBufAllocatorEnable = true;

listenPort:NameServer監聽端口,該值默認會被初始化為9876
serverWorkerThreads:Netty業務線程池線程個數
serverCallbackExecutorThreads:Netty public任務線程池線程個數,Netty網絡設計,根據業務類型會創建不同的線程池,比如處理消息發送、消息消費、心跳檢測等。
serverSelectorThreads:IO線程池個數,主要是NameServer、Broker端解析請求、返回相應的線程個數,這類線程主要是處理網路請求的,解析請求包,然后轉發到各個業務線程池完成具體的操作,然后將結果返回給調用方;
serverOnewaySemaphoreValue:send oneway消息請求;
serverAsyncSemaphoreValue:異步消息發送最大并發數;
serverChannelMaxIdleTimeSeconds :網絡連接最大的空閑時間,默認120s。
serverSocketSndBufSize:網絡socket發送端緩沖區大小。
serverSocketRcvBufSize: 網絡socket接收端緩存區大小。
serverPooledByteBufAllocatorEnable:ByteBuffer是否開啟緩存;
useEpollNativeSelector:是否啟用Epoll IO模型。


RouteInfoManager

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

BROKER_CHANNEL_EXPIRED_TIME:NameServer與Broker空閑連接時長,在2 minNameServer之內沒有收到Broker的心跳包,則NameServer會關閉與該Broker的連接并刪除Broker的路由信息。

lock:讀寫鎖,用來保護以下用于存儲關鍵信息的非線程安全容器HashMap。

topicQueueTable:用于存儲主題與隊列的映射關系,記錄一個主題topic的隊列分布在哪些Broker上。以下是QueueData屬性值:

private String brokerName; //broker名稱 private int readQueueNums; //讀隊列個數 private int writeQueueNums; //寫隊列個數 private int perm; //操作權限 private int topicSysFlag; //同步復制還是異步復制的標識

brokerAddrTable:用于記錄所有Broker信息。以下是BrokerData屬性值:

private String cluster; //當前Broker所屬集群 private String brokerName; //Broker名稱 //BrokerId=0表示主節點,BrokerId>0表示從節點 //記錄BrokerId與對應節點地址的映射信息 private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

clusterAddrTable:用于記錄Broker集群信息

brokerLiveTable:用于記錄活躍狀態的Broker,NameServer每隔10s對所有Broker進行掃描,如果有Broker宕機,會將該Broker從該表中刪去,以此維護可用的Broker列表信息。以下是BrokerLiveInfo的屬性值:

private long lastUpdateTimestamp; //上次發送心跳包的時間戳 private DataVersion dataVersion; //記錄數據版本信息 private Channel channel; private String haServerAddr; //Master節點地址

3.啟動與關閉流程

NameServer啟動時序圖:

啟動類:org.apache.rocketmq.namesrv.NamesrvStartup.java

3.1.步驟一

解析配置文件,填充NamesrvConfig、NettyServerConfig并創建NamesrvController:

啟動類:

public static void main(String[] args) {main0(args); }public static NamesrvController main0(String[] args) {try {//創建NamesrvController的入口NamesrvController controller = createNamesrvController(args);start(controller);String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null; }

NamesrvController#createNamesrvController:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {//....//創建namesrvConfigfinal NamesrvConfig namesrvConfig = new NamesrvConfig();//創建nettyServerConfigfinal NettyServerConfig nettyServerConfig = new NettyServerConfig();//設置默認端口9876nettyServerConfig.setListenPort(9876);//-c 指定屬性配置文件的位置if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}//-p 屬性名=屬性值if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}//將啟動參數填充到namesrvConfig中MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);//如果未指定'ROCKETMQ_HOME'環境變量if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}//....//打印配置信息日志MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);//根據namesrvConfig和nettyServerConfig創建NamesrvControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// 將配置存入controller.configuration以防止配置丟失controller.getConfiguration().registerConfig(properties);return controller; }

3.2.步驟二

根據配置創建好NamesrvController之后,對其進行初始化:

//NamesrvStartup#start public static NamesrvController start(final NamesrvController controller) throws Exception {//進行簡單的檢查if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}//controller初始化boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}//....controller.start(); //開啟遠程服務-this.remotingServer.start();return controller; }//NamesrvController#initialize public boolean initialize() {//加載配置管理器this.kvConfigManager.load();//創建Netty遠程服務this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//創建遠程服務線程池this.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//注冊線程池this.registerProcessor();//定時任務線程池--->每隔十秒掃描活躍狀態異常的Broker信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {/*** 對Not Active Broker 進行掃描*/@Overridepublic void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);//定時任務線程池--->每隔十秒打印KVConfig信息this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);//....return true;}

3.3.步驟三

在JVM進程關閉之前,先將線程池關閉,及時釋放資源。

public static NamesrvController start(final NamesrvController controller) throws Exception {//....//JVM進程關閉之前,將線程池關閉,資源釋放Runtime.getRuntime().addShutdownHook/*注冊JVM鉤子函數*/(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {controller.shutdown();return null;}}));//.... }

以上僅供個人學習使用,如有不足請指正!

總結

以上是生活随笔為你收集整理的RocketMQ:NameServer架构设计以及启动关闭流程源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。