源码分析RocketMQ ACL实现机制
有關RocketMQ ACL的使用請查看上一篇《RocketMQ ACL使用指南》,本文從源碼的角度,分析一下RocketMQ ACL的實現原理。
備注:RocketMQ在4.4.0時引入了ACL機制,本文代碼基于RocketMQ4.5.0版本。
根據RocketMQ ACL使用手冊,我們應該首先看一下Broker服務器在開啟ACL機制時如何加載配置文件,并如何工作的。
1、BrokerController#initialAcl
Broker端ACL的入口代碼為:BrokerController#initialAcl
private void initialAcl() {if (!this.brokerConfig.isAclEnable()) { // @1log.info("The broker dose not enable acl");return;}List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); // @2if (accessValidators == null || accessValidators.isEmpty()) {log.info("The broker dose not load the AccessValidator");return;}for (AccessValidator accessValidator: accessValidators) { // @3final AccessValidator validator = accessValidator;this.registerServerRPCHook(new RPCHook() {@Overridepublic void doBeforeRequest(String remoteAddr, RemotingCommand request) {//Do not catch the exceptionvalidator.validate(validator.parse(request, remoteAddr)); // @4}@Overridepublic void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {}});} }本方法的實現共4個關鍵點。
代碼@1:首先判斷Broker是否開啟了acl,通過配置參數aclEnable指定,默認為false。
代碼@2:使用類似SPI機制,加載配置的AccessValidator,該方法返回一個列表,其實現邏輯時讀取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的訪問驗證器,默認配置內容如下:
代碼@3:遍歷配置的訪問驗證器(AccessValidator),并向Broker處理服務器注冊鉤子函數,RPCHook的doBeforeRequest方法會在服務端接收到請求,將其請求解碼后,執行處理請求之前被調用;RPCHook的doAfterResponse方法會在處理完請求后,將結果返回之前被調用,其調用如圖所示:
代碼@4:在RPCHook#doBeforeRequest方法中調用AccessValidator#validate, 在真實處理命令之前,先執行ACL的驗證邏輯,如果擁有該操作的執行權限,則放行,否則拋出AclException。
接下來,我們將重點放到Broker默認實現的訪問驗證器:PlainAccessValidator。
2、PlainAccessValidator
2.1 類圖
- AccessValidator
- 訪問驗證器接口,主要定義兩個接口。
1)AccessResource parse(RemotingCommand request, String remoteAddr)
從請求頭中解析本次請求對應的訪問資源,即本次請求需要的訪問權限。
2)void validate(AccessResource accessResource)
根據本次需要訪問的權限,與請求用戶擁有的權限進行對比驗證,判斷是擁有權限,如果沒有訪問該操作的權限,則拋出異常,否則放行。
- PlainAccessValidator
RocketMQ默認提供的基于yml配置格式的訪問驗證器。
接下來我們重點看一下PlainAccessValidator的parse方法與validate方法的實現細節。在講解該方法之前,我們首先認識一下RocketMQ封裝訪問資源的PlainAccessResource。
2.1.2 PlainAccessResource類圖
我們對其屬性一一做個介紹:
- private String accessKey
訪問Key,用戶名。 - private String secretKey
用戶密碼。 - private String whiteRemoteAddress
遠程IP地址白名單。 - private boolean admin
是否是管理員角色。 - private byte defaultTopicPerm = 1
默認topic訪問權限,即如果沒有配置topic的權限,則Topic默認的訪問權限為1,表示為DENY。 - private byte defaultGroupPerm = 1
默認的消費組訪問權限,默認為DENY。 - private Map resourcePermMap
資源需要的訪問權限映射表。 - private RemoteAddressStrategy remoteAddressStrategy
遠程IP地址驗證策略。 - private int requestCode
當前請求的requestCode。 - private byte[] content
請求頭與請求體的內容。 - private String signature
簽名字符串,這是通常的套路,在客戶端時,首先將請求參數排序,然后使用secretKey生成簽名字符串,服務端重復這個步驟,然后對比簽名字符串,如果相同,則認為登錄成功,否則失敗。 - private String secretToken
密鑰token。 - private String recognition
目前作用未知,代碼中目前未被使用。
2.2 構造方法
public PlainAccessValidator() {aclPlugEngine = new PlainPermissionLoader(); }構造函數,直接創建PlainPermissionLoader對象,從命名上來看,應該是觸發acl規則的加載,即解析plain_acl.yml,接下來會重點探討,即acl啟動流程之配置文件的解析。
2.3 parse方法
該方法的作用就是從請求命令中解析出本次訪問所需要的訪問權限,最終構建AccessResource對象,為后續的校驗權限做準備。
PlainAccessResource accessResource = new PlainAccessResource(); if (remoteAddr != null && remoteAddr.contains(":")) {accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]); } else {accessResource.setWhiteRemoteAddress(remoteAddr); }Step1:首先創建PlainAccessResource,從遠程地址中提取出遠程訪問IP地址。
if (request.getExtFields() == null) {throw new AclException("request's extFields value is null"); } accessResource.setRequestCode(request.getCode()); accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));Step2:如果請求頭中的擴展字段為空,則拋出異常,如果不為空,則從請求頭中讀取requestCode、accessKey(請求用戶名)、簽名字符串(signature)、secretToken。
try {switch (request.getCode()) {case RequestCode.SEND_MESSAGE:accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);break;case RequestCode.SEND_MESSAGE_V2:accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);break;case RequestCode.CONSUMER_SEND_MSG_BACK:accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);break;case RequestCode.PULL_MESSAGE:accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);break;case RequestCode.QUERY_MESSAGE:accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);break;case RequestCode.HEART_BEAT:HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);for (ConsumerData data : heartbeatData.getConsumerDataSet()) {accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);}}break;case RequestCode.UNREGISTER_CLIENT:final UnregisterClientRequestHeader unregisterClientRequestHeader =(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);break;case RequestCode.GET_CONSUMER_LIST_BY_GROUP:final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);break;case RequestCode.UPDATE_CONSUMER_OFFSET:final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =(UpdateConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);break;default:break;}} catch (Throwable t) {throw new AclException(t.getMessage(), t);}Step3:根據請求命令,設置本次請求需要擁有的權限,上述代碼比較簡單,就是從請求中得出本次操作的Topic、消息組名稱,為了方便區分topic與消費組,消費組使用消費者對應的重試主題,當成資源的Key,從這里也可以看出,當前版本需要進行ACL權限驗證的請求命令如下:
- SEND_MESSAGE
- SEND_MESSAGE_V2
- CONSUMER_SEND_MSG_BACK
- PULL_MESSAGE
- QUERY_MESSAGE
- HEART_BEAT
- UNREGISTER_CLIENT
- GET_CONSUMER_LIST_BY_GROUP
- UPDATE_CONSUMER_OFFSET
Step4:對擴展字段進行排序,便于生成簽名字符串,然后將擴展字段與請求體(body)寫入content字段。完成從請求頭中解析出本次請求需要驗證的權限。
2.4 validate 方法
public void validate(AccessResource accessResource) {aclPlugEngine.validate((PlainAccessResource) accessResource); }驗證權限,即根據本次請求需要的權限與當前用戶所擁有的權限進行對比,如果符合,則正常執行;否則拋出AclException。
為了揭開配置文件的解析與驗證,我們將目光投入到PlainPermissionLoader。
3、PlainPermissionLoader
該類的主要職責:加載權限,即解析acl主要配置文件plain_acl.yml。
3.1 類圖
下面對其核心屬性與核心方法一一介紹:
- DEFAULT_PLAIN_ACL_FILE?
默認acl配置文件名稱,默認值為conf/plain_acl.yml。 - String fileName
acl配置文件名稱,默認為DEFAULT_PLAIN_ACL_FILE ,可以通過系統參數-Drocketmq.acl.plain.file=fileName指定。 - Map plainAccessResourceMap
解析出來的權限配置映射表,以用戶名為鍵。 - RemoteAddressStrategyFactory remoteAddressStrategyFactory
遠程IP解析策略工廠,用于解析白名單IP地址。 - boolean isWatchStart
是否開啟了文件監聽,即自動監聽plain_acl.yml文件,一旦該文件改變,可在不重啟服務器的情況下自動生效。 - public PlainPermissionLoader()
構造方法。 - public void load()
加載配置文件。 - public void validate(PlainAccessResource plainAccessResource)
驗證是否有權限訪問待訪問資源。
3.2 PlainPermissionLoader構造方法
public PlainPermissionLoader() {load();watch(); }在構造方法中調用load與watch方法。
3.3 load
Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); String path = fileHome + File.separator + fileName; JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);Step1:初始化plainAccessResourceMap(用戶配置的訪問資源,即權限容器)、globalWhiteRemoteAddressStrategy:全局IP白名單訪問策略。配置文件,默認為${ROCKETMQ_HOME}/conf/plain_acl.yml。
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses"); if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));} }Step2:globalWhiteRemoteAddresses:全局白名單,類型為數組。根據配置的規則,使用remoteAddressStrategyFactory獲取一個訪問策略,下文會重點介紹其配置規則。
JSONArray accounts = plainAclConfData.getJSONArray("accounts"); if (accounts != null && !accounts.isEmpty()) {List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);} } this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy; this.plainAccessResourceMap = plainAccessResourceMap;Step3:解析plain_acl.yml文件中的另外一個根元素accounts,用戶定義的權限信息。從PlainAccessConfig的定義來看,accounts標簽下支持如下標簽:
- accessKey
- secretKey
- whiteRemoteAddress
- admin
- defaultTopicPerm
- defaultGroupPerm
- topicPerms
- groupPerms
上述標簽的說明,請參考::《RocketMQ ACL使用指南》?。具體的解析過程比較容易,就不再細說。
load方法主要完成acl配置文件的解析,將用戶定義的權限加載到內存中。
3.4 watch
private void watch() {try {String watchFilePath = fileHome + fileName;FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {@Overridepublic void onChanged(String path) { log.info("The plain acl yml changed, reload the context");load();}});fileWatchService.start();log.info("Succeed to start AclWatcherService");this.isWatchStart = true;} catch (Exception e) {log.error("Failed to start AclWatcherService", e);} }監聽器,默認以500ms的頻率判斷文件的內容是否變化。在文件內容發生變化后調用load()方法,重新加載配置文件。那FileWatchService是如何判斷兩個文件的內容發生了變化呢?
FileWatchService#hash private String hash(String filePath) throws IOException, NoSuchAlgorithmException {Path path = Paths.get(filePath);md.update(Files.readAllBytes(path));byte[] hash = md.digest();return UtilAll.bytes2string(hash); }獲取文件md5簽名來做對比,這里為什么不在啟動時先記錄上一次文件的修改時間,然后先判斷其修改時間是否變化,再判斷其內容是否真正發生變化。
3.5 validate
// Check the global white remote addr for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {if (remoteAddressStrategy.match(plainAccessResource)) {return;} }Step1:首先使用全局白名單對資源進行驗證,只要一個規則匹配,則返回,表示認證成功。
if (plainAccessResource.getAccessKey() == null) {throw new AclException(String.format("No accessKey is configured")); } if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); }Step2:如果請求信息中,沒有設置用戶名,則拋出未配置AccessKey異常;如果Broker中并為配置該用戶的配置信息,則拋出AclException。
// Check the white addr for accesskey PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {return; }Step3:如果用戶配置的白名單與待訪問資源規則匹配的話,則直接發認證通過。
// Check the signature String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) {throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); }Step4:驗證簽名。
checkPerm(plainAccessResource, ownedAccess);Step5:調用checkPerm方法,驗證需要的權限與擁有的權限是否匹配。
3.5.1 checkPerm
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); }Step6:如果當前的請求命令屬于必須是Admin用戶才能訪問的權限,并且當前用戶并不是管理員角色,則拋出異常,如下命令需要admin角色才能進行的操作:
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) {// If the needCheckedPermMap is null,then returnreturn; } if (ownedPermMap == null && ownedAccess.isAdmin()) {// If the ownedPermMap is null and it is an admin user, then returnreturn; }Step7:如果該請求不需要進行權限驗證,則通過認證,如果當前用戶的角色是管理員,并且沒有配置用戶權限,則認證通過,返回。
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {String resource = needCheckedEntry.getKey();Byte neededPerm = needCheckedEntry.getValue();boolean isGroup = PlainAccessResource.isRetryTopic(resource);if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {// Check the default permbyte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm();if (!Permission.checkPermission(neededPerm, ownedPerm)) {throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));}continue;}if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));} }Step8:遍歷需要權限與擁有的權限進行對比,如果配置對應的權限,則判斷是否匹配;如果未配置權限,則判斷默認權限時是否允許,不允許,則拋出AclException。
驗證邏輯就介紹到這里了,下面給出其匹配流程圖:
上述闡述了從Broker服務器啟動、加載acl配置文件流程、動態監聽配置文件、服務端權限驗證流程,接下來我們看一下客戶端關于ACL需要處理的事情。
4、AclClientRPCHook
回顧一下,我們引入ACL機制后,客戶端的代碼示例如下:
其在創建DefaultMQProducer時,注冊AclClientRPCHook鉤子,會在向服務端發送遠程命令前后執行其鉤子函數,接下來我們重點分析一下AclClientRPCHook。
4.1 doBeforeRequest
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {byte[] total = AclUtils.combineRequestContent(request,parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); // @1String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); // @2request.addExtField(SIGNATURE, signature); // @3request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey()); // The SecurityToken value is unneccessary,user can choose this one.if (sessionCredentials.getSecurityToken() != null) {request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());} }代碼@1:將Request請求參數進行排序,并加入accessKey。
代碼@2:對排好序的請參數,使用用戶配置的密碼生成簽名,并最近到擴展字段Signature,然后服務端也會按照相同的算法生成Signature,如果相同,則表示簽名驗證成功(類似于實現登錄的效果)。
代碼@3:將Signature、AccessKey等加入到請求頭的擴展字段中,服務端拿到這些元數據,結合請求頭中的信息,根據配置的權限,進行權限校驗。
關于ACL客戶端生成簽名是一種通用套路,就不在細講了。
源碼分析ACL的實現就介紹到這里了,下文將介紹RocketMQ 消息軌跡的使用與實現原理分析。如果大家覺得文章寫的還不錯的話,期待幫忙點贊,謝謝。
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的源码分析RocketMQ ACL实现机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 3D 真的很难吗,瞧瞧支付宝怎么做?
- 下一篇: 一个周内上线50个增长策略,竟然能这么高