kafka權限控制-Acl

介紹

Acl表示對一個資源的訪問權限。它由Resource和Acl組成。java

Resource表示一個具體的資源。node

Acl表示權限,由主體principal,是否容許permissionType,主機host,操做operation組成。json

Resource

//  ResourceType表示資源類型,name則表示資源標識符
case class Resource(resourceType: ResourceType, name: String) {
    
  override def toString: String = {
    resourceType.name + Resource.Separator + name
  }
}

以一個名爲test的Topic爲例,用Resource表示這個資源session

new Resource(ResourceType.Topic, "test")

ResourceType

object ResourceType {

  def fromString(resourceType: String): ResourceType = {
    // 從values找到name相等的type
    val rType = values.find(rType => rType.name.equalsIgnoreCase(resourceType))
    rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(",")))
  }
   // 取值序列
  def values: Seq[ResourceType] = List(Topic, Group, Cluster, TransactionalId)

  def fromJava(operation: JResourceType): ResourceType = fromString(operation.toString.replaceAll("_", ""))
}

ResourceType只有四種內置的類型數據結構

case object Topic extends ResourceType {
  val name = "Topic"
  val error = Errors.TOPIC_AUTHORIZATION_FAILED
  val toJava = JResourceType.TOPIC
}

case object Group extends ResourceType {
  val name = "Group"
  val error = Errors.GROUP_AUTHORIZATION_FAILED
  val toJava = JResourceType.GROUP
}

case object Cluster extends ResourceType {
  val name = "Cluster"
  val error = Errors.CLUSTER_AUTHORIZATION_FAILED
  val toJava = JResourceType.CLUSTER
}

case object TransactionalId extends ResourceType {
  val name = "TransactionalId"
  val error = Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
  val toJava = JResourceType.TRANSACTIONAL_ID
}

Operation

object Operation {

  def fromString(operation: String): Operation = {
    // 從values找到name相等的值
    val op = values.find(op => op.name.equalsIgnoreCase(operation))
    op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(",")))
  }

  def fromJava(operation: AclOperation): Operation = fromString(operation.toString.replaceAll("_", ""))
  // 取值集合
  def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, DescribeConfigs, IdempotentWrite, All)
}

Opearation只有下面幾種內置的類型ide

// 讀操做
case object Read extends Operation {
  val name = "Read"
  val toJava = AclOperation.READ
}
// 寫操做
case object Write extends Operation {
  val name = "Write"
  val toJava = AclOperation.WRITE
}
// 新建操做
case object Create extends Operation {
  val name = "Create"
  val toJava = AclOperation.CREATE
}
// 刪除操做
case object Delete extends Operation {
  val name = "Delete"
  val toJava = AclOperation.DELETE
}
// 修改操做
case object Alter extends Operation {
  val name = "Alter"
  val toJava = AclOperation.ALTER
}
// 描述操做
case object Describe extends Operation {
  val name = "Describe"
  val toJava = AclOperation.DESCRIBE
}
// 集羣操做
case object ClusterAction extends Operation {
  val name = "ClusterAction"
  val toJava = AclOperation.CLUSTER_ACTION
}
// 描述配置操做
case object DescribeConfigs extends Operation {
  val name = "DescribeConfigs"
  val toJava = AclOperation.DESCRIBE_CONFIGS
}
// 修改配置操做
case object AlterConfigs extends Operation {
  val name = "AlterConfigs"
  val toJava = AclOperation.ALTER_CONFIGS
}
// 
case object IdempotentWrite extends Operation {
  val name = "IdempotentWrite"
  val toJava = AclOperation.IDEMPOTENT_WRITE
}
// 表示全部的操做
case object All extends Operation {
  val name = "All"
  val toJava = AclOperation.ALL
}

PermissionType

object PermissionType {
  def fromString(permissionType: String): PermissionType = {
    val pType = values.find(pType => pType.name.equalsIgnoreCase(permissionType))
    pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(",")))
  }
   // 從values找到name相等的值
  def fromJava(permissionType: AclPermissionType): PermissionType = fromString(permissionType.toString)
  // 取值集合
  def values: Seq[PermissionType] = List(Allow, Deny)
}

內置的PermissionType,只有兩種,Allow表示容許,Deny表示拒絕。函數

case object Allow extends PermissionType {
  val name = "Allow"
  val toJava = AclPermissionType.ALLOW
}

case object Deny extends PermissionType {
  val name = "Deny"
  val toJava = AclPermissionType.DENY
}

KafkaPrincipal

KafkaPrincipal默認是以User類型,來區分的。ui

public class KafkaPrincipal implements Principal {
    public static final String SEPARATOR = ":";
    public static final String USER_TYPE = "User";
    public final static KafkaPrincipal ANONYMOUS = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "ANONYMOUS");
    // 主體類型
    private String principalType;
    // 標識符
    private String name;

    public KafkaPrincipal(String principalType, String name) {
        if (principalType == null || name == null) {
            throw new IllegalArgumentException("principalType and name can not be null");
        }
        this.principalType = principalType;
        this.name = name;
    }

    public static KafkaPrincipal fromString(String str) {
        if (str == null || str.isEmpty()) {
            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
        }
        // 以:字符切割
        String[] split = str.split(SEPARATOR, 2);

        if (split == null || split.length != 2) {
            throw new IllegalArgumentException("expected a string in format principalType:principalName but got " + str);
        }

        return new KafkaPrincipal(split[0], split[1]);

    public String toString() {
        return principalType + SEPARATOR + name;
    }
}

Acl

case class Acl(principal: KafkaPrincipal, permissionType: PermissionType, host: String, operation: Operation) {
  // 轉爲map類型。後面會再轉爲json類型,存到zookeeper的節點中 
  def toMap(): Map[String, Any] = {
    Map(Acl.PrincipalKey -> principal.toString,
      Acl.PermissionTypeKey -> permissionType.name,
      Acl.OperationKey -> operation.name,
      Acl.HostsKey -> host)
  }
}

object Acl {
  val WildCardPrincipal: KafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
  val WildCardHost: String = "*"
  val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
  val PrincipalKey = "principal"
  val PermissionTypeKey = "permissionType"
  val OperationKey = "operation"
  val HostsKey = "host"
  val VersionKey = "version"
  val CurrentVersion = 1
  val AclsKey = "acls"

  /**
  aclJson數據存儲在zookeeper中,它的格式以下
      {
        "version": 1,
        "acls": [
          {
            "host":"host1",
            "permissionType": "Deny",
            "operation": "Read",
            "principal": "User:alice"
          }
        ]
      }

   */
  def fromJson(aclJson: String): Set[Acl] = {
    if (aclJson == null || aclJson.isEmpty)
      return collection.immutable.Set.empty[Acl]

    var acls: collection.mutable.HashSet[Acl] = new collection.mutable.HashSet[Acl]()
    Json.parseFull(aclJson) match {
      case Some(m) =>
        val aclMap = m.asInstanceOf[Map[String, Any]]
        //the acl json version.
        require(aclMap(VersionKey) == CurrentVersion)
        // 獲取aclJson的acls的值
        val aclSet: List[Map[String, Any]] = aclMap(AclsKey).asInstanceOf[List[Map[String, Any]]]
        aclSet.foreach(item => {
          val principal: KafkaPrincipal = KafkaPrincipal.fromString(item(PrincipalKey).asInstanceOf[String])
          val permissionType: PermissionType = PermissionType.fromString(item(PermissionTypeKey).asInstanceOf[String])
          val operation: Operation = Operation.fromString(item(OperationKey).asInstanceOf[String])
          val host: String = item(HostsKey).asInstanceOf[String]
          // 構建Acl,而且添加到acls裏
          acls += new Acl(principal, permissionType, host, operation)
        })
      case None =>
    }
    acls.toSet
  }
}

SimpleAclAuthorizer

實現了Authorizer接口,主要提供了Acl的管理this

class SimpleAclAuthorizer extends Authorizer with Logging {

    private val aclCache = new scala.collection.mutable.HashMap[Resource, VersionedAcls]
    // 初始化配置
    override def configure(javaConfigs: util.Map[String, _]) {
    val configs = javaConfigs.asScala
    val props = new java.util.Properties()
    configs.foreach { case (key, value) => props.put(key, value.toString) }
    // 從配置中獲取super.users的值,這是一個字符串。
    // 格式爲User:user1;User:user2,用戶之間用;隔開,一個用戶是User:username的格式。
    superUsers = configs.get(SimpleAclAuthorizer.SuperUsersProp).collect {
      case str: String if str.nonEmpty => str.split(";").map(s => KafkaPrincipal.fromString(s.trim)).toSet
    }.getOrElse(Set.empty[KafkaPrincipal])
    
    // 這個配置表示,當沒有找到對應的Acl規則時,默認是否容許
    shouldAllowEveryoneIfNoAclIsFound = configs.get(SimpleAclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
    
    // 初始化zookeeper鏈接
    val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
    val zkUrl = configs.get(SimpleAclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
    val zkConnectionTimeoutMs = configs.get(SimpleAclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
    val zkSessionTimeOutMs = configs.get(SimpleAclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
    zkUtils = ZkUtils(zkUrl,
                      sessionTimeout = zkSessionTimeOutMs,
                      connectionTimeout = zkConnectionTimeoutMs,
                      kafkaConfig.zkEnableSecureAcls)
    
    // 保證Acl節點存在
    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclZkPath)
    // 從zookeeper中讀取數據,初始化
    loadCache()
    // 保證Acl節點存在aclCache
    zkUtils.makeSurePersistentPathExists(SimpleAclAuthorizer.AclChangedZkPath)
    
    // 註冊監聽時間,當節點有變更時,會自行調用AclChangedNotificationHandler回調函數,更新aclCache
    aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
    aclChangeListener.init()
    }

    
  private def loadCache()  {
    inWriteLock(lock) {
      // zkUtils.getChildren 返回子節點列表,子節點的數據類型爲String
      // 返回"/acls"節點的子節點
      val resourceTypes = zkUtils.getChildren(SimpleAclAuthorizer.AclZkPath)
      for (rType <- resourceTypes) {
        // 根據string實例化ResourceType
        val resourceType = ResourceType.fromString(rType)
        // 返回"/acls/resourceName"節點的子節點
        val resourceTypePath = SimpleAclAuthorizer.AclZkPath + "/" + resourceType.name
        val resourceNames = zkUtils.getChildren(resourceTypePath)
        for (resourceName <- resourceNames) {
          // 根據type和name實例化Resource,而後從zookeeper中讀取到對應的Acl列表
          val versionedAcls = getAclsFromZk(Resource(resourceType, resourceName.toString))
          // 更新aclCache
          updateCache(new Resource(resourceType, resourceName), versionedAcls)
        }
      }
    }
  }

  def toResourcePath(resource: Resource): String = {
     // 根據Resource找到zookeeper中對應的節點路徑
    SimpleAclAuthorizer.AclZkPath + "/" + resource.resourceType + "/" + resource.name
  }

  private def getAclsFromZk(resource: Resource): VersionedAcls = {
    // 讀取Resource對應節點的數據
    val (aclJson, stat) = zkUtils.readDataMaybeNull(toResourcePath(resource))
    // 調用Acl.fromJson解析數據,返回VersionedAcls。VersionedAcls定義在下面
    VersionedAcls(aclJson.map(Acl.fromJson).getOrElse(Set()), stat.getVersion)
  }
  
  // 更新aclCache
  private def updateCache(resource: Resource, versionedAcls: VersionedAcls) {
    if (versionedAcls.acls.nonEmpty) {
      aclCache.put(resource, versionedAcls)
    } else {
      aclCache.remove(resource)
    }
  }

}

VersionedAcls的定義scala

object SimpleAclAuthorizer {
 // VersionedAcls只是Acl的列表和zkVersion的版本號
 private case class VersionedAcls(acls: Set[Acl], zkVersion: Int)
}

SimpleAclAuthorizer還有一個重要的方法authorize,用於檢查權限

class SimpleAclAuthorizer extends Authorizer with Logging {
  override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = {
    val principal = session.principal
    val host = session.clientAddress.getHostAddress
    // 獲取resource對應的Acl列表和該resource的type的默認Acl列表
    // WildCardResource表示匹配全部
    val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource))

    // 從上面resource找到全部的Acls中,查找是否有deny的acl
    val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)

    
    val allowOps = operation match {
      // Read, Write, Delete, Alter動做包含了Describe
      case Describe => Set[Operation](Describe, Read, Write, Delete, Alter)
      // AlterConfigs包含了DescribeConfigs
      case DescribeConfigs => Set[Operation](DescribeConfigs, AlterConfigs)
      // 其他的不修改
      case _ => Set[Operation](operation)
    }
    // 遍歷allowOps列表,查找是否有明確指定Allow的acl
    val allowMatch = allowOps.exists(operation => aclMatch(operation, resource, principal, host, Allow, acls))

    // 有如下三種條件,知足其一,則認爲有權限
    // 是不是super user
    val authorized = isSuperUser(operation, resource, principal, host) ||
      // 若是acls沒有找到,查看默認配置
      isEmptyAclAndAuthorized(operation, resource, principal, host, acls) ||
      // 若是沒有找到deny的acl,而且還有allow的acl
      (!denyMatch && allowMatch)

    logAuditMessage(principal, authorized, operation, resource, host)
    authorized
  }

  private def aclMatch(operations: Operation, resource: Resource, principal: KafkaPrincipal, host:         
            String, permissionType: PermissionType, acls: Set[Acl]): Boolean = {
    acls.find { acl =>
      // // permissionType相等
      acl.permissionType == permissionType &&
        // principal相等,或者principal爲WildCardPrincipal,表示匹配全部
        (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) &&
        // operation相等,或者operation爲All,表示匹配全部
        (operations == acl.operation || acl.operation == All) &&
        // host相等,或者host爲WildCardHost,表示匹配全部
        (acl.host == host || acl.host == Acl.WildCardHost)
    }.exists { acl =>
      authorizerLogger.debug(s"operation = $operations on resource = $resource from host = $host is $permissionType based on acl = $acl")
      // 找到後,則返回true。沒有,則返回false
      true
    }
  }

  // 是否principal爲super user
  def isSuperUser(operation: Operation, resource: Resource, principal: KafkaPrincipal, host: String): Boolean = {
    // superUsers是否包含principal
    if (superUsers.contains(principal)) {
      authorizerLogger.debug(s"principal = $principal is a super user, allowing operation without checking acls.")
      true
    } else false
  }

  def isEmptyAclAndAuthorized(operation: Operation, resource: Resource, principal: KafkaPrincipal,             
        host: String, acls: Set[Acl]): Boolean = {
    if (acls.isEmpty) {
      authorizerLogger.debug(s"No acl found for resource $resource, authorized = $shouldAllowEveryoneIfNoAclIsFound")
      // 返回配置的值
      shouldAllowEveryoneIfNoAclIsFound
    } else false
  }
}

由於acl數據持久化到zookeeper中,因此當zookeeper中的數據發生改變時,應該還有監聽的做用。這個是經過zookeeper的watch來實現的。

acl的更新涉及到zookeeper的兩個地方。一個是Resource節點,存儲acls。另外一個是持久順序節點,它的子節點記錄了每次Resource的更新。

aclChangeListener = new ZkNodeChangeNotificationListener(zkUtils, SimpleAclAuthorizer.AclChangedZkPath, SimpleAclAuthorizer.AclChangedPrefix, AclChangedNotificationHandler)
    aclChangeListener.init()

class ZkNodeChangeNotificationListener(private val zkUtils: ZkUtils,
                                       private val seqNodeRoot: String,
                                       private val seqNodePrefix: String,
                                       private val notificationHandler: NotificationHandler,
                                       private val changeExpirationMs: Long = 15 * 60 * 1000,
                                       private val time: Time = Time.SYSTEM) extends Logging {
  private var lastExecutedChange = -1L
  private val isClosed = new AtomicBoolean(false)

  // 初始化
  def init() {
    zkUtils.makeSurePersistentPathExists(seqNodeRoot)
    // 監聽seqNodeRoot節點的子節點變化。
    // seqNodeRoot就是上面所說的順序節點
    zkUtils.zkClient.subscribeChildChanges(seqNodeRoot, NodeChangeListener)
    zkUtils.zkClient.subscribeStateChanges(ZkStateChangeListener)
    processAllNotifications()
  }

  def processAllNotifications() {
    // 獲取seqNodeRoot的子節點。子節點存儲了resource
    val changes = zkUtils.zkClient.getChildren(seqNodeRoot)
    // 而且從小到大排序
    processNotifications(changes.asScala.sorted)
  }


  private def processNotifications(notifications: Seq[String]) {
    if (notifications.nonEmpty) {
      info(s"Processing notification(s) to $seqNodeRoot")
      try {
        val now = time.milliseconds
        for (notification <- notifications) {
          // 獲取當前節點的順序號
          val changeId = changeNumber(notification)
          if (changeId > lastExecutedChange) {
            // 若是changeId比上次更新的id大,則表示這是新的紀錄
            val changeZnode = seqNodeRoot + "/" + notification
            // 讀取當前節點的數據,表示Resource的字符串
            val (data, _) = zkUtils.readDataMaybeNull(changeZnode)
            // 調用notificationHandler的processNotification方法
            data.map(notificationHandler.processNotification(_)).getOrElse {
              logger.warn(s"read null data from $changeZnode when processing notification $notification")
            }
          }
          // 更新lastExecutedChange
          lastExecutedChange = changeId
        }
        purgeObsoleteNotifications(now, notifications)
      } catch {
        case e: ZkInterruptedException =>
          if (!isClosed.get)
            throw e
      }
    }
  }


  // 由於它是順序節點的子節點,因此名稱後綴會有自增數字
  // 相似於acl_changes_0000000001, acl_changes_0000000002
  private def changeNumber(name: String): Long = name.substring(seqNodePrefix.length).toLong

   // 監聽子節點變化
  object NodeChangeListener extends IZkChildListener {

    override def handleChildChange(path: String, notifications: java.util.List[String]) {
      try {
        import scala.collection.JavaConverters._
        if (notifications != null)
          // 調用processNotifications方法
          processNotifications(notifications.asScala.sorted)
      } catch {
        case e: Exception => error(s"Error processing notification change for path = $path and notification= $notifications :", e)
      }
    }
  }
}

上面processNotifications方法,調用了AclChangedNotificationHandler 的processNotification方法。

object AclChangedNotificationHandler extends NotificationHandler {
    override def processNotification(notificationMessage: String) {
      // 經過字符串,實例化Resource
      val resource: Resource = Resource.fromString(notificationMessage)
      inWriteLock(lock) {
        // 從zookeeper中讀取該resource的acls
        val versionedAcls = getAclsFromZk(resource)
        // 更新aclCache
        updateCache(resource, versionedAcls)
      }
    }

歸納

本章先介紹了與acl相關的數據結構。Resource表明資源,Acl表示訪問規則。

而後介紹了acl的管理,SimpleAclAuthorizer類。其中涉及到了zookeeper的數據持久化,aclCache的更新。二者之間的同步,經過了zookeeper的watch機制來實現。

相關文章
相關標籤/搜索