有關RocketMQ ACL的使用請查看上一篇《RocketMQ ACL使用指南》,本文從源碼的角度,分析一下RocketMQ ACL的實現原理。java
備註:RocketMQ在4.4.0時引入了ACL機制,本文代碼基於RocketMQ4.5.0版本。算法
根據RocketMQ ACL使用手冊,咱們應該首先看一下Broker服務器在開啓ACL機制時如何加載配置文件,並如何工做的。apache
Broker端ACL的入口代碼爲:BrokerController#initialAcl數組
private void initialAcl() {
if (!this.brokerConfig.isAclEnable()) { // @1
log.info("The broker dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); // @2
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
for (AccessValidator accessValidator: accessValidators) { // @3
final AccessValidator validator = accessValidator;
this.registerServerRPCHook(new RPCHook() {
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
validator.validate(validator.parse(request, remoteAddr)); // @4
}
@Override
public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
}
});
}
}
複製代碼
本方法的實現共4個關鍵點。 代碼@1:首先判斷Broker是否開啓了acl,經過配置參數aclEnable指定,默認爲false。服務器
代碼@2:使用相似SPI機制,加載配置的AccessValidator,該方法返回一個列表,其實現邏輯時讀取META-INF/service/org.apache.rocketmq.acl.AccessValidator文件中配置的訪問驗證器,默認配置內容以下: session
代碼@3:遍歷配置的訪問驗證器(AccessValidator),並向Broker處理服務器註冊鉤子函數,RPCHook的doBeforeRequest方法會在服務端接收到請求,將其請求解碼後,執行處理請求以前被調用;RPCHook的doAfterResponse方法會在處理完請求後,將結果返回以前被調用,其調用如圖所示: 代碼@4:在RPCHook#doBeforeRequest方法中調用AccessValidator#validate, 在真實處理命令以前,先執行ACL的驗證邏輯,若是擁有該操做的執行權限,則放行,不然拋出AclException。接下來,咱們將重點放到Broker默認實現的訪問驗證器:PlainAccessValidator。ide
接下來咱們重點看一下PlainAccessValidator的parse方法與validate方法的實現細節。在講解該方法以前,咱們首先認識一下RocketMQ封裝訪問資源的PlainAccessResource。函數
public PlainAccessValidator() {
aclPlugEngine = new PlainPermissionLoader();
}
複製代碼
構造函數,直接建立PlainPermissionLoader對象,從命名上來看,應該是觸發acl規則的加載,即解析plain_acl.yml,接下來會重點探討,即acl啓動流程之配置文件的解析。源碼分析
該方法的做用就是從請求命令中解析出本次訪問所須要的訪問權限,最終構建AccessResource對象,爲後續的校驗權限作準備。ui
PlainAccessResource accessResource = new PlainAccessResource();
if (remoteAddr != null && remoteAddr.contains(":")) {
accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]);
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
複製代碼
Step1:首先建立PlainAccessResource,從遠程地址中提取出遠程訪問IP地址。
if (request.getExtFields() == null) {
throw new AclException("request's extFields value is null");
}
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN));
複製代碼
Step2:若是請求頭中的擴展字段爲空,則拋出異常,若是不爲空,則從請求頭中讀取requestCode、accessKey(請求用戶名)、簽名字符串(signature)、secretToken。
try {
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB);
break;
case RequestCode.SEND_MESSAGE_V2:
accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB);
break;
case RequestCode.CONSUMER_SEND_MSG_BACK:
accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB);
break;
case RequestCode.PULL_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB);
break;
case RequestCode.QUERY_MESSAGE:
accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB);
break;
case RequestCode.HEART_BEAT:
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB);
for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) {
accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB);
}
}
break;
case RequestCode.UNREGISTER_CLIENT:
final UnregisterClientRequestHeader unregisterClientRequestHeader =
(UnregisterClientRequestHeader) request
.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader =
(GetConsumerListByGroupRequestHeader) request
.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB);
break;
case RequestCode.UPDATE_CONSUMER_OFFSET:
final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB);
accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB);
break;
default:
break;
}
} catch (Throwable t) {
throw new AclException(t.getMessage(), t);
}
複製代碼
Step3:根據請求命令,設置本次請求須要擁有的權限,上述代碼比較簡單,就是從請求中得出本次操做的Topic、消息組名稱,爲了方便區分topic與消費組,消費組使用消費者對應的重試主題,當成資源的Key,從這裏也能夠看出,當前版本須要進行ACL權限驗證的請求命令以下:
// Content
SortedMap<String, String> map = new TreeMap<String, String>();
for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) {
if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
}
}
accessResource.setContent(AclUtils.combineRequestContent(request, map));
return accessResource;
複製代碼
Step4:對擴展字段進行排序,便於生成簽名字符串,而後將擴展字段與請求體(body)寫入content字段。完成從請求頭中解析出本次請求須要驗證的權限。
public void validate(AccessResource accessResource) {
aclPlugEngine.validate((PlainAccessResource) accessResource);
}
複製代碼
驗證權限,即根據本次請求須要的權限與當前用戶所擁有的權限進行對比,若是符合,則正常執行;不然拋出AclException。
爲了揭開配置文件的解析與驗證,咱們將目光投入到PlainPermissionLoader。
該類的主要職責:加載權限,即解析acl主要配置文件plain_acl.yml。
public PlainPermissionLoader() {
load();
watch();
}
複製代碼
在構造方法中調用load與watch方法。
Map<String, PlainAccessResource> plainAccessResourceMap = new HashMap<>();
List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>();
String path = fileHome + File.separator + fileName;
JSONObject plainAclConfData = AclUtils.getYamlDataObject(path,JSONObject.class);
複製代碼
Step1:初始化plainAccessResourceMap(用戶配置的訪問資源,即權限容器)、globalWhiteRemoteAddressStrategy:全局IP白名單訪問策略。配置文件,默認爲${ROCKETMQ_HOME}/conf/plain_acl.yml。
JSONArray globalWhiteRemoteAddressesList = plainAclConfData.getJSONArray("globalWhiteRemoteAddresses");
if (globalWhiteRemoteAddressesList != null && !globalWhiteRemoteAddressesList.isEmpty()) {
for (int i = 0; i < globalWhiteRemoteAddressesList.size(); i++) {
globalWhiteRemoteAddressStrategy.add(remoteAddressStrategyFactory.
getRemoteAddressStrategy(globalWhiteRemoteAddressesList.getString(i)));
}
}
複製代碼
Step2:globalWhiteRemoteAddresses:全局白名單,類型爲數組。根據配置的規則,使用remoteAddressStrategyFactory獲取一個訪問策略,下文會重點介紹其配置規則。
JSONArray accounts = plainAclConfData.getJSONArray("accounts");
if (accounts != null && !accounts.isEmpty()) {
List<PlainAccessConfig> plainAccessConfigList = accounts.toJavaList(PlainAccessConfig.class);
for (PlainAccessConfig plainAccessConfig : plainAccessConfigList) {
PlainAccessResource plainAccessResource = buildPlainAccessResource(plainAccessConfig);
plainAccessResourceMap.put(plainAccessResource.getAccessKey(),plainAccessResource);
}
}
this.globalWhiteRemoteAddressStrategy = globalWhiteRemoteAddressStrategy;
this.plainAccessResourceMap = plainAccessResourceMap;
複製代碼
Step3:解析plain_acl.yml文件中的另一個根元素accounts,用戶定義的權限信息。從PlainAccessConfig的定義來看,accounts標籤下支持以下標籤:
load方法主要完成acl配置文件的解析,將用戶定義的權限加載到內存中。
private void watch() {
try {
String watchFilePath = fileHome + fileName;
FileWatchService fileWatchService = new FileWatchService(new String[] {watchFilePath}, new FileWatchService.Listener() {
@Override
public void onChanged(String path) {
log.info("The plain acl yml changed, reload the context");
load();
}
});
fileWatchService.start();
log.info("Succeed to start AclWatcherService");
this.isWatchStart = true;
} catch (Exception e) {
log.error("Failed to start AclWatcherService", e);
}
}
複製代碼
監聽器,默認以500ms的頻率判斷文件的內容是否變化。在文件內容發生變化後調用load()方法,從新加載配置文件。那FileWatchService是如何判斷兩個文件的內容發生了變化呢?
FileWatchService#hash
private String hash(String filePath) throws IOException, NoSuchAlgorithmException {
Path path = Paths.get(filePath);
md.update(Files.readAllBytes(path));
byte[] hash = md.digest();
return UtilAll.bytes2string(hash);
}
複製代碼
獲取文件md5簽名來作對比,這裏爲何不在啓動時先記錄上一次文件的修改時間,而後先判斷其修改時間是否變化,再判斷其內容是否真正發生變化。
// Check the global white remote addr
for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) {
if (remoteAddressStrategy.match(plainAccessResource)) {
return;
}
}
複製代碼
Step1:首先使用全局白名單對資源進行驗證,只要一個規則匹配,則返回,表示認證成功。
if (plainAccessResource.getAccessKey() == null) {
throw new AclException(String.format("No accessKey is configured"));
}
if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) {
throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey()));
}
Step2:若是請求信息中,沒有設置用戶名,則拋出未配置AccessKey異常;若是Broker中併爲配置該用戶的配置信息,則拋出AclException。
// Check the white addr for accesskey
PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey());
if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) {
return;
}
複製代碼
Step3:若是用戶配置的白名單與待訪問資源規則匹配的話,則直接發認證經過。
// Check the signature
String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey());
if (!signature.equals(plainAccessResource.getSignature())) {
throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey()));
}
複製代碼
Step4:驗證簽名。
checkPerm(plainAccessResource, ownedAccess);
複製代碼
Step5:調用checkPerm方法,驗證須要的權限與擁有的權限是否匹配。
if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey()));
}
複製代碼
Step6:若是當前的請求命令屬於必須是Admin用戶才能訪問的權限,而且當前用戶並非管理員角色,則拋出異常,以下命令須要admin角色才能進行的操做:
Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap();
if (needCheckedPermMap == null) {
// If the needCheckedPermMap is null,then return
return;
}
if (ownedPermMap == null && ownedAccess.isAdmin()) {
// If the ownedPermMap is null and it is an admin user, then return
return;
}
複製代碼
Step7:若是該請求不須要進行權限驗證,則經過認證,若是當前用戶的角色是管理員,而且沒有配置用戶權限,則認證經過,返回。
for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
boolean isGroup = PlainAccessResource.isRetryTopic(resource);
if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
複製代碼
Step8:遍歷須要權限與擁有的權限進行對比,若是配置對應的權限,則判斷是否匹配;若是未配置權限,則判斷默認權限時是否容許,不容許,則拋出AclException。
驗證邏輯就介紹到這裏了,下面給出其匹配流程圖:
上述闡述了從Broker服務器啓動、加載acl配置文件流程、動態監聽配置文件、服務端權限驗證流程,接下來咱們看一下客戶端關於ACL須要處理的事情。回顧一下,咱們引入ACL機制後,客戶端的代碼示例以下:
其在建立DefaultMQProducer時,註冊AclClientRPCHook鉤子,會在向服務端發送遠程命令先後執行其鉤子函數,接下來咱們重點分析一下AclClientRPCHook。public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
byte[] total = AclUtils.combineRequestContent(request,
parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken())); // @1
String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey()); // @2
request.addExtField(SIGNATURE, signature); // @3
request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
// The SecurityToken value is unneccessary,user can choose this one.
if (sessionCredentials.getSecurityToken() != null) {
request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
}
}
複製代碼
代碼@1:將Request請求參數進行排序,並加入accessKey。
代碼@2:對排好序的請參數,使用用戶配置的密碼生成簽名,並最近到擴展字段Signature,而後服務端也會按照相同的算法生成Signature,若是相同,則表示簽名驗證成功(相似於實現登陸的效果)。
代碼@3:將Signature、AccessKey等加入到請求頭的擴展字段中,服務端拿到這些元數據,結合請求頭中的信息,根據配置的權限,進行權限校驗。
關於ACL客戶端生成簽名是一種通用套路,就不在細講了。
源碼分析ACL的實現就介紹到這裏了,下文將介紹RocketMQ 消息軌跡的使用與實現原理分析。若是你們以爲文章寫的還不錯的話,期待幫忙點贊,謝謝。
做者簡介:《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,維護公衆號:中間件興趣圈,可掃描以下二維碼與做者進行互動。