本文主要研究一下rocketmq的AccessValidatorjava
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/AccessValidator.javagit
public interface AccessValidator { /** * Parse to get the AccessResource(user, resource, needed permission) * * @param request * @param remoteAddr * @return Plain access resource result,include access key,signature and some other access attributes. */ AccessResource parse(RemotingCommand request, String remoteAddr); /** * Validate the access resource. * * @param accessResource */ void validate(AccessResource accessResource); /** * Update the access resource config * * @param plainAccessConfig * @return */ boolean updateAccessConfig(PlainAccessConfig plainAccessConfig); /** * Delete the access resource config * * @return */ boolean deleteAccessConfig(String accesskey); /** * Get the access resource config version information * * @return */ String getAclConfigVersion(); /** * Update globalWhiteRemoteAddresses in acl yaml config file * @return */ boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList); }
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainAccessValidator.javagithub
public class PlainAccessValidator implements AccessValidator { private PlainPermissionManager aclPlugEngine; public PlainAccessValidator() { aclPlugEngine = new PlainPermissionManager(); } @Override public AccessResource parse(RemotingCommand request, String remoteAddr) { PlainAccessResource accessResource = new PlainAccessResource(); if (remoteAddr != null && remoteAddr.contains(":")) { accessResource.setWhiteRemoteAddress(remoteAddr.split(":")[0]); } else { accessResource.setWhiteRemoteAddress(remoteAddr); } accessResource.setRequestCode(request.getCode()); if (request.getExtFields() == null) { // If request's extFields is null,then return accessResource directly(users can use whiteAddress pattern) // The following logic codes depend on the request's extFields not to be null. return accessResource; } accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSecretToken(request.getExtFields().get(SessionCredentials.SECURITY_TOKEN)); try { switch (request.getCode()) { case RequestCode.SEND_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.PUB); break; case RequestCode.SEND_MESSAGE_V2: accessResource.addResourceAndPerm(request.getExtFields().get("b"), Permission.PUB); break; case RequestCode.CONSUMER_SEND_MSG_BACK: accessResource.addResourceAndPerm(request.getExtFields().get("originTopic"), Permission.PUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("group")), Permission.SUB); break; case RequestCode.PULL_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); accessResource.addResourceAndPerm(getRetryTopic(request.getExtFields().get("consumerGroup")), Permission.SUB); break; case RequestCode.QUERY_MESSAGE: accessResource.addResourceAndPerm(request.getExtFields().get("topic"), Permission.SUB); break; case RequestCode.HEART_BEAT: HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); for (ConsumerData data : heartbeatData.getConsumerDataSet()) { accessResource.addResourceAndPerm(getRetryTopic(data.getGroupName()), Permission.SUB); for (SubscriptionData subscriptionData : data.getSubscriptionDataSet()) { accessResource.addResourceAndPerm(subscriptionData.getTopic(), Permission.SUB); } } break; case RequestCode.UNREGISTER_CLIENT: final UnregisterClientRequestHeader unregisterClientRequestHeader = (UnregisterClientRequestHeader) request .decodeCommandCustomHeader(UnregisterClientRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(unregisterClientRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.GET_CONSUMER_LIST_BY_GROUP: final GetConsumerListByGroupRequestHeader getConsumerListByGroupRequestHeader = (GetConsumerListByGroupRequestHeader) request .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(getConsumerListByGroupRequestHeader.getConsumerGroup()), Permission.SUB); break; case RequestCode.UPDATE_CONSUMER_OFFSET: final UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); accessResource.addResourceAndPerm(getRetryTopic(updateConsumerOffsetRequestHeader.getConsumerGroup()), Permission.SUB); accessResource.addResourceAndPerm(updateConsumerOffsetRequestHeader.getTopic(), Permission.SUB); break; default: break; } } catch (Throwable t) { throw new AclException(t.getMessage(), t); } // Content SortedMap<String, String> map = new TreeMap<String, String>(); for (Map.Entry<String, String> entry : request.getExtFields().entrySet()) { if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) { map.put(entry.getKey(), entry.getValue()); } } accessResource.setContent(AclUtils.combineRequestContent(request, map)); return accessResource; } @Override public void validate(AccessResource accessResource) { aclPlugEngine.validate((PlainAccessResource) accessResource); } @Override public boolean updateAccessConfig(PlainAccessConfig plainAccessConfig) { return aclPlugEngine.updateAccessConfig(plainAccessConfig); } @Override public boolean deleteAccessConfig(String accesskey) { return aclPlugEngine.deleteAccessConfig(accesskey); } @Override public String getAclConfigVersion() { return aclPlugEngine.getAclConfigDataVersion(); } @Override public boolean updateGlobalWhiteAddrsConfig(List<String> globalWhiteAddrsList) { return aclPlugEngine.updateGlobalWhiteAddrsConfig(globalWhiteAddrsList); } }
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.javaapache
public class PlainPermissionManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final String DEFAULT_PLAIN_ACL_FILE = "/conf/plain_acl.yml"; private String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); private String fileName = System.getProperty("rocketmq.acl.plain.file", DEFAULT_PLAIN_ACL_FILE); private Map<String/** AccessKey **/, PlainAccessResource> plainAccessResourceMap = new HashMap<>(); private List<RemoteAddressStrategy> globalWhiteRemoteAddressStrategy = new ArrayList<>(); private RemoteAddressStrategyFactory remoteAddressStrategyFactory = new RemoteAddressStrategyFactory(); private boolean isWatchStart; private final DataVersion dataVersion = new DataVersion(); public PlainPermissionManager() { load(); watch(); } //...... public void validate(PlainAccessResource plainAccessResource) { // Check the global white remote addr for (RemoteAddressStrategy remoteAddressStrategy : globalWhiteRemoteAddressStrategy) { if (remoteAddressStrategy.match(plainAccessResource)) { return; } } if (plainAccessResource.getAccessKey() == null) { throw new AclException(String.format("No accessKey is configured")); } if (!plainAccessResourceMap.containsKey(plainAccessResource.getAccessKey())) { throw new AclException(String.format("No acl config for %s", plainAccessResource.getAccessKey())); } // Check the white addr for accesskey PlainAccessResource ownedAccess = plainAccessResourceMap.get(plainAccessResource.getAccessKey()); if (ownedAccess.getRemoteAddressStrategy().match(plainAccessResource)) { return; } // Check the signature String signature = AclUtils.calSignature(plainAccessResource.getContent(), ownedAccess.getSecretKey()); if (!signature.equals(plainAccessResource.getSignature())) { throw new AclException(String.format("Check signature failed for accessKey=%s", plainAccessResource.getAccessKey())); } // Check perm of each resource checkPerm(plainAccessResource, ownedAccess); } void checkPerm(PlainAccessResource needCheckedAccess, PlainAccessResource ownedAccess) { if (Permission.needAdminPerm(needCheckedAccess.getRequestCode()) && !ownedAccess.isAdmin()) { throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", needCheckedAccess.getRequestCode(), ownedAccess.getAccessKey())); } Map<String, Byte> needCheckedPermMap = needCheckedAccess.getResourcePermMap(); Map<String, Byte> ownedPermMap = ownedAccess.getResourcePermMap(); if (needCheckedPermMap == null) { // If the needCheckedPermMap is null,then return return; } if (ownedPermMap == null && ownedAccess.isAdmin()) { // If the ownedPermMap is null and it is an admin user, then return return; } for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) { String resource = needCheckedEntry.getKey(); Byte neededPerm = needCheckedEntry.getValue(); boolean isGroup = PlainAccessResource.isRetryTopic(resource); if (ownedPermMap == null || !ownedPermMap.containsKey(resource)) { // Check the default perm byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } continue; } if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } } } //...... }
rocketmq/acl/src/main/java/org/apache/rocketmq/acl/common/Permission.javaide
public class Permission { public static final byte DENY = 1; public static final byte ANY = 1 << 1; public static final byte PUB = 1 << 2; public static final byte SUB = 1 << 3; public static final Set<Integer> ADMIN_CODE = new HashSet<Integer>(); static { // UPDATE_AND_CREATE_TOPIC ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_TOPIC); // UPDATE_BROKER_CONFIG ADMIN_CODE.add(RequestCode.UPDATE_BROKER_CONFIG); // DELETE_TOPIC_IN_BROKER ADMIN_CODE.add(RequestCode.DELETE_TOPIC_IN_BROKER); // UPDATE_AND_CREATE_SUBSCRIPTIONGROUP ADMIN_CODE.add(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP); // DELETE_SUBSCRIPTIONGROUP ADMIN_CODE.add(RequestCode.DELETE_SUBSCRIPTIONGROUP); } public static boolean checkPermission(byte neededPerm, byte ownedPerm) { if ((ownedPerm & DENY) > 0) { return false; } if ((neededPerm & ANY) > 0) { return ((ownedPerm & PUB) > 0) || ((ownedPerm & SUB) > 0); } return (neededPerm & ownedPerm) > 0; } public static byte parsePermFromString(String permString) { if (permString == null) { return Permission.DENY; } switch (permString.trim()) { case "PUB": return Permission.PUB; case "SUB": return Permission.SUB; case "PUB|SUB": return Permission.PUB | Permission.SUB; case "SUB|PUB": return Permission.PUB | Permission.SUB; case "DENY": return Permission.DENY; default: return Permission.DENY; } } public static void parseResourcePerms(PlainAccessResource plainAccessResource, Boolean isTopic, List<String> resources) { if (resources == null || resources.isEmpty()) { return; } for (String resource : resources) { String[] items = StringUtils.split(resource, "="); if (items.length == 2) { plainAccessResource.addResourceAndPerm(isTopic ? items[0].trim() : PlainAccessResource.getRetryTopic(items[0].trim()), parsePermFromString(items[1].trim())); } else { throw new AclException(String.format("Parse resource permission failed for %s:%s", isTopic ? "topic" : "group", resource)); } } } public static boolean needAdminPerm(Integer code) { return ADMIN_CODE.contains(code); } }
AccessValidator接口定義了parse、validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法;PlainAccessValidator實現了AccessValidator接口,其構造器建立了PlainPermissionManager;其parse方法解析remotingCommand及remoteAddr,構造plainAccessResource,具體根據不一樣的RequestCode來設置resourceAndPerm,以後根據request及fieldsMap設置content;其validate、updateAccessConfig、deleteAccessConfig、getAclConfigVersion、updateGlobalWhiteAddrsConfig方法都委託給了PlainPermissionManagercode