Acl表示對一個資源的訪問權限。它由Resource和Acl組成。java
Resource表示一個具體的資源。node
Acl表示權限,由主體principal,是否容許permissionType,主機host,操做operation組成。json
// ResourceType表示資源類型,name則表示資源標識符 case class Resource(resourceType: ResourceType, name: String) { override def toString: String = { resourceType.name + Resource.Separator + name } }
以一個名爲test的Topic爲例,用Resource表示這個資源session
new Resource(ResourceType.Topic, "test")
object ResourceType { def fromString(resourceType: String): ResourceType = { // 從values找到name相等的type val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType)) rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } // 取值序列 def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId) def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", "")) }
ResourceType只有四種內置的類型數據結構
case object Topic extends ResourceType { val name = "Topic" val error = Errors.TOPIC_AUTHORIZATION_FAILED val toJava = JResourceType.TOPIC } case object Group extends ResourceType { val name = "Group" val error = Errors.GROUP_AUTHORIZATION_FAILED val toJava = JResourceType.GROUP } case object Cluster extends ResourceType { val name = "Cluster" val error = Errors.CLUSTER_AUTHORIZATION_FAILED val toJava = JResourceType.CLUSTER } case object TransactionalId extends ResourceType { val name = "TransactionalId" val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED val toJava = JResourceType.TRANSACTIONAL_ID }
object Operation { def fromString(operation: String): Operation = { // 從values找到name相等的值 val op = values.find(op => op.name.equalsIgnoreCase(operation)) op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) } def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", "")) // 取值集合 def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All) }
Opearation只有下面幾種內置的類型ide
// 讀操做 case object Read extends Operation { val name = "Read" val toJava = AclOperation.READ } // 寫操做 case object Write extends Operation { val name = "Write" val toJava = AclOperation.WRITE } // 新建操做 case object Create extends Operation { val name = "Create" val toJava = AclOperation.CREATE } // 刪除操做 case object Delete extends Operation { val name = "Delete" val toJava = AclOperation.DELETE } // 修改操做 case object Alter extends Operation { val name = "Alter" val toJava = AclOperation.ALTER } // 描述操做 case object Describe extends Operation { val name = "Describe" val toJava = AclOperation.DESCRIBE } // 集羣操做 case object ClusterAction extends Operation { val name = "ClusterAction" val toJava = AclOperation.CLUSTER_ACTION } // 描述配置操做 case object DescribeConfigs extends Operation { val name = "DescribeConfigs" val toJava = AclOperation.DESCRIBE_CONFIGS } // 修改配置操做 case object AlterConfigs extends Operation { val name = "AlterConfigs" val toJava = AclOperation.ALTER_CONFIGS } // case object IdempotentWrite extends Operation { val name = "IdempotentWrite" val toJava = AclOperation.IDEMPOTENT_WRITE } // 表示全部的操做 case object All extends Operation { val name = "All" val toJava = AclOperation.ALL }
object PermissionType { def fromString(permissionType: String): PermissionType = { val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType)) pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(","))) } // 從values找到name相等的值 def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString) // 取值集合 def values: Seq[PermissionType] = List(Allow, Deny) }
內置的PermissionType,只有兩種,Allow表示容許,Deny表示拒絕。函數
case object Allow extends PermissionType { val name = "Allow" val toJava = AclPermissionType.ALLOW } case object Deny extends PermissionType { val name = "Deny" val toJava = AclPermissionType.DENY }
KafkaPrincipal默認是以User類型,來區分的。ui
public class KafkaPrincipal implements Principal { public static final String SEPARATOR = ":"; public static final String USER_TYPE = "User"; public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS"); // 主體類型 private String principalType; // 標識符 private String name; public KafkaPrincipal(String principalType, String name) { if (principalType == null || name == null) { throw new IllegalArgumentException("principalType and name can not be null"); } this.principalType = principalType; this.name = name; } public static KafkaPrincipal fromString(String str) { if (str == null || str.isEmpty()) { throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); } // 以:字符切割 String[] split = str.split(SEPARATOR, 2); if (split == null || split.length != 2) { throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str); } return new KafkaPrincipal(split[0], split[1]); public String toString() { return principalType + SEPARATOR + name; } }
case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) { // 轉爲map類型。後面會再轉爲json類型,存到zookeeper的節點中 def toMap(): Map[String, Any] = { Map(Acl.PrincipalKey -> principal.toString, Acl.PermissionTypeKey -> permissionType.name, Acl.OperationKey -> operation.name, Acl.HostsKey -> host) } } object Acl { val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*") val WildCardHost: String = "*" val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All) val PrincipalKey = "principal" val PermissionTypeKey = "permissionType" val OperationKey = "operation" val HostsKey = "host" val VersionKey = "version" val CurrentVersion = 1 val AclsKey = "acls" /** aclJson數據存儲在zookeeper中,它的格式以下 { "version": 1, "acls": [ { "host":"host1", "permissionType": "Deny", "operation": "Read", "principal": "User:alice" } ] } */ def fromJson(aclJson: String): Set[Acl] = { if (aclJson == null || aclJson.isEmpty) return collection.immutable.Set.empty[Acl] var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]() Json.parseFull(aclJson) match { case Some(m) => val aclMap = m.asInstanceOf[Map[String, Any]] //the acl json version. require(aclMap(VersionKey) == CurrentVersion) // 獲取aclJson的acls的值 val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]] aclSet.foreach(item => { val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String]) val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String]) val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String]) val host: String = item(HostsKey).asInstanceOf[String] // 構建Acl,而且添加到acls裏 acls += new Acl(principal, permissionType, host, operation) }) case None => } acls.toSet } }
實現了Authorizer接口,主要提供了Acl的管理this
class SimpleAclAuthorizer extends Authorizer with Logging { private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls] // 初始化配置 override def configure(javaConfigs: util.Map[String, _]) { val configs = javaConfigs.asScala val props = new java.util.Properties() configs.foreach { case (key, value) => props.put(key, value.toString) } // 從配置中獲取super.users的值,這是一個字符串。 // 格式爲User:user1;User:user2,用戶之間用;隔開,一個用戶是User:username的格式。 superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect { case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet }.getOrElse(Set.empty[KafkaPrincipal]) // 這個配置表示,當沒有找到對應的Acl規則時,默認是否容許 shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean) // 初始化zookeeper鏈接 val kafkaConfig = KafkaConfig.fromProps(props, doLog = false) val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect) val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs) val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs) zkUtils = ZkUtils(zkUrl, sessionTimeout = zkSessionTimeOutMs, connectionTimeout = zkConnectionTimeoutMs, kafkaConfig.zkEnableSecureAcls) // 保證Acl節點存在 zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath) // 從zookeeper中讀取數據,初始化 loadCache() // 保證Acl節點存在aclCache zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath) // 註冊監聽時間,當節點有變更時,會自行調用AclChangedNotificationHandler回調函數,更新aclCache aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) aclChangeListener.init() } private def loadCache() { inWriteLock(lock) { // zkUtils.getChildren 返回子節點列表,子節點的數據類型爲String // 返回"/acls"節點的子節點 val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath) for (rType <- resourceTypes) { // 根據string實例化ResourceType val resourceType = ResourceType.fromString(rType) // 返回"/acls/resourceName"節點的子節點 val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name val resourceNames = zkUtils.getChildren(resourceTypePath) for (resourceName <- resourceNames) { // 根據type和name實例化Resource,而後從zookeeper中讀取到對應的Acl列表 val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString)) // 更新aclCache updateCache(new Resource(resourceType, resourceName), versionedAcls) } } } } def toResourcePath(resource: Resource): String = { // 根據Resource找到zookeeper中對應的節點路徑 SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name } private def getAclsFromZk(resource: Resource): VersionedAcls = { // 讀取Resource對應節點的數據 val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource)) // 調用Acl.fromJson解析數據,返回VersionedAcls。VersionedAcls定義在下面 VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion) } // 更新aclCache private def updateCache(resource: Resource, versionedAcls: VersionedAcls) { if (versionedAcls.acls.nonEmpty) { aclCache.put(resource, versionedAcls) } else { aclCache.remove(resource) } } }
VersionedAcls的定義scala
object SimpleAclAuthorizer { // VersionedAcls只是Acl的列表和zkVersion的版本號 private case class VersionedAcls(acls: Set[Acl], zkVersion: Int) }
SimpleAclAuthorizer還有一個重要的方法authorize,用於檢查權限
class SimpleAclAuthorizer extends Authorizer with Logging { override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { val principal = session.principal val host = session.clientAddress.getHostAddress // 獲取resource對應的Acl列表和該resource的type的默認Acl列表 // WildCardResource表示匹配全部 val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) // 從上面resource找到全部的Acls中,查找是否有deny的acl val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls) val allowOps = operation match { // Read, Write, Delete, Alter動做包含了Describe case Describe => Set[Operation](Describe, Read, Write, Delete, Alter) // AlterConfigs包含了DescribeConfigs case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs) // 其他的不修改 case _ => Set[Operation](operation) } // 遍歷allowOps列表,查找是否有明確指定Allow的acl val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls)) // 有如下三種條件,知足其一,則認爲有權限 // 是不是super user val authorized = isSuperUser(operation, resource, principal, host) || // 若是acls沒有找到,查看默認配置 isEmptyAclAndAuthorized(operation, resource, principal, host, acls) || // 若是沒有找到deny的acl,而且還有allow的acl (!denyMatch && allowMatch) logAuditMessage(principal, authorized, operation, resource, host) authorized } private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): Boolean = { acls.find { acl => // // permissionType相等 acl.permissionType == permissionType && // principal相等,或者principal爲WildCardPrincipal,表示匹配全部 (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) && // operation相等,或者operation爲All,表示匹配全部 (operations == acl.operation || acl.operation == All) && // host相等,或者host爲WildCardHost,表示匹配全部 (acl.host == host || acl.host == Acl.WildCardHost) }.exists { acl => authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl") // 找到後,則返回true。沒有,則返回false true } } // 是否principal爲super user def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = { // superUsers是否包含principal if (superUsers.contains(principal)) { authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.") true } else false } def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String, acls: Set[Acl]): Boolean = { if (acls.isEmpty) { authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound") // 返回配置的值 shouldAllowEveryoneIfNoAclIsFound } else false } }
由於acl數據持久化到zookeeper中,因此當zookeeper中的數據發生改變時,應該還有監聽的做用。這個是經過zookeeper的watch來實現的。
acl的更新涉及到zookeeper的兩個地方。一個是Resource節點,存儲acls。另外一個是持久順序節點,它的子節點記錄了每次Resource的更新。
aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler) aclChangeListener.init() class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils, private val seqNodeRoot: String, private val seqNodePrefix: String, private val notificationHandler: NotificationHandler, private val changeExpirationMs: Long = 15 * 60 * 1000, private val time: Time = Time.SYSTEM) extends Logging { private var lastExecutedChange = -1L private val isClosed = new AtomicBoolean(false) // 初始化 def init() { zkUtils.makeSurePersistentPathExists(seqNodeRoot) // 監聽seqNodeRoot節點的子節點變化。 // seqNodeRoot就是上面所說的順序節點 zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener) zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener) processAllNotifications() } def processAllNotifications() { // 獲取seqNodeRoot的子節點。子節點存儲了resource val changes = zkUtils.zkClient.getChildren(seqNodeRoot) // 而且從小到大排序 processNotifications(changes.asScala.sorted) } private def processNotifications(notifications: Seq[String]) { if (notifications.nonEmpty) { info(s"Processing notification(s) to $seqNodeRoot") try { val now = time.milliseconds for (notification <- notifications) { // 獲取當前節點的順序號 val changeId = changeNumber(notification) if (changeId > lastExecutedChange) { // 若是changeId比上次更新的id大,則表示這是新的紀錄 val changeZnode = seqNodeRoot + "/" + notification // 讀取當前節點的數據,表示Resource的字符串 val (data, _) = zkUtils.readDataMaybeNull(changeZnode) // 調用notificationHandler的processNotification方法 data.map(notificationHandler.processNotification(_)).getOrElse { logger.warn(s"read null data from $changeZnode when processing notification $notification") } } // 更新lastExecutedChange lastExecutedChange = changeId } purgeObsoleteNotifications(now, notifications) } catch { case e: ZkInterruptedException => if (!isClosed.get) throw e } } } // 由於它是順序節點的子節點,因此名稱後綴會有自增數字 // 相似於acl_changes_0000000001, acl_changes_0000000002 private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong // 監聽子節點變化 object NodeChangeListener extends IZkChildListener { override def handleChildChange(path: String, notifications: java.util.List[String]) { try { import scala.collection.JavaConverters._ if (notifications != null) // 調用processNotifications方法 processNotifications(notifications.asScala.sorted) } catch { case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e) } } } }
上面processNotifications方法,調用了AclChangedNotificationHandler 的processNotification方法。
object AclChangedNotificationHandler extends NotificationHandler { override def processNotification(notificationMessage: String) { // 經過字符串,實例化Resource val resource: Resource = Resource.fromString(notificationMessage) inWriteLock(lock) { // 從zookeeper中讀取該resource的acls val versionedAcls = getAclsFromZk(resource) // 更新aclCache updateCache(resource, versionedAcls) } }
本章先介紹了與acl相關的數據結構。Resource表明資源,Acl表示訪問規則。
而後介紹了acl的管理,SimpleAclAuthorizer類。其中涉及到了zookeeper的數據持久化,aclCache的更新。二者之間的同步,經過了zookeeper的watch機制來實現。