5-Kafka 0.10.0 SASL/PLAIN身份認證及權限實現

本文主要介紹一下使用官方發佈的 Kafka 0.10.0 版如何實現 SASL/PLAIN 認證機制以及權限控制。html

Kafka 安全機制

Kafka 的安全機制主要分爲兩部分:java

  • 身份認證(Authentication):對client 與服務器的鏈接進行身份認證。
  • 權限控制(Authorization):實現對於消息級別的權限控制

In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:apache

  1. Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
  2. Authentication of connections from brokers to ZooKeeper
  3. Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
  4. Authorization of read / write operations by clients
  5. Authorization is pluggable and integration with external authorization services is supported

這段話的中文意思也就是說json

  1. 可使用 SSL 或者 SASL 進行客戶端(producer 和 consumer)、其餘 brokers、tools與 brokers 之間鏈接的認證,SASL/PLAIN將在0.10.0中獲得支持;
  2. 對brokers和zookeeper之間的鏈接進行Authentication;
  3. 數據傳輸用SSL加密,性能會降低;
  4. 對clients的讀寫操做進行Authorization;
  5. Authorization 是pluggable,與外部的authorization services結合進行支持。

Kafka身份認證

Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認證機制,關於這些認證機制的介紹能夠參考一下三篇文章。安全

SASL/PLAIN 認證

能夠參考kafka使用SASL驗證,這個官方文檔的中文版。bash

Kafka Server 端配置

須要在 Kafka 安裝目錄下的config/server.properties文件中配置如下信息服務器

Kafka Server 端配置

在 kafka 安裝目錄下的config/server.properties配置一下信息app

listeners=SASL_PLAINTEXT://ip:pot
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin

還須要配置一個名 kafka_server_jaas.conf 的配置文件,將配置文件放置在conf目錄下。 ide

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};

這裏,咱們配置了兩個用戶:admin 和 alice,密碼分別爲 admin 和 alice。
最後須要爲 Kafka 添加 java.security.auth.login.config 環境變量。在 bin/kafka-run-class.sh中添加如下內容性能

KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf'
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
  nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
  exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@"
fi

注:實際上,咱們只是添加了第一行,並在第4和第6行中添加了 $KAFKA_SASL_OPTS 這個環境變量。

KafkaClient 配置

首先須要在客戶端配置 kafka_client_jaas.conf 文件

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice";
};

而後在(producer 和 consumer)程序中添加環境變量和配置,以下所示

System.setProperty("java.security.auth.login.config", ".../kafka_client_jaas.conf"); // 環境變量添加,須要輸入配置文件的路徑
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");

配置完以上內容後,就能夠正常運行 producer 和 consumer 程序,若是帳戶密碼錯誤的話,程序就不能正常進行,可是不會有任何提示,這方面後面會進行一些改進。

Kafka權限控制

這個小節介紹一下 Kafka 的 ACL 。

權限的內容

權限 說明
READ 讀取topic
WRITE 寫入topic
DELETE 刪除topic
CREATE 建立topic
ALTER 修改topic
DESCRIBE 獲取topic的信息
ClusterAction  
ALL 全部權限

訪問控制列表ACL存儲在zk上,路徑爲/kafka-acl

權限配置

Kafka 提供的命令以下表所示

Option Description Default Option type
–add Indicates to the script that user is trying to add an acl.   Action
–remove Indicates to the script that user is trying to remove an acl.   Action
–list Indicates to the script that user is trying to list acts.   Action
–authorizer Fully qualified class name of the authorizer. kafka.security.auth.SimpleAclAuthorizer Configuration
–authorizer-properties key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181   Configuration
–cluster Specifies cluster as resource.   Resource
–topic [topic-name] Specifies the topic as resource.   Resource
–group [group-name] Specifies the consumer-group as resource.   Resource
–allow-principal Principal is in PrincipalType:name format that will be added to ACL with Allowpermission. You can specify multiple –allow-principal in a single command.   Principal
–deny-principal Principal is in PrincipalType:name format that will be added to ACL with Denypermission. You can specify multiple –deny-principal in a single command.   Principal
–allow-host IP address from which principals listed in –allow-principal will have access. if –allow-principal is specified defaults to * which translates to 「all hosts」 Host
–deny-host IP address from which principals listed in –deny-principal will be denied access. if –deny-principal is specified defaults to * which translates to 「all hosts」 Host
–operation Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All All Operation
–producer Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster.   Convenience
–consumer Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group.   Convenience

權限設置

經過幾個例子介紹一下如何進行權限設置。

add 操做

# 爲用戶 alice 在 test(topic)上添加讀寫的權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test
# 對於 topic 爲 test 的消息隊列,拒絕來自 ip 爲198.51.100.3帳戶爲 BadBob  進行 read 操做,其餘用戶都容許
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test
# 爲bob 和 alice 添加all,以容許來自 ip 爲198.51.100.0或者198.51.100.1的讀寫請求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

list 操做

# 列出 topic 爲 test 的全部權限帳戶
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --list --topic test

輸出信息爲:

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Describe from hosts: *
    User:alice has Allow permission for operations: Read from hosts: *
    User:alice has Allow permission for operations: Write from hosts: *

remove 操做

# 移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test

producer 和 consumer 的操做

# producer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test
#consumer
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --consumer --topic test —group test-group

本小節記錄了在使用 SASL/PLAIN 時遇到的一些坑。

Controller鏈接broker失敗

錯誤信息以下:

[2016-07-27 17:45:46,047] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-07-27 17:45:46,056] INFO [delete-topics-thread-1], Starting  (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2016-07-27 17:45:46,057] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager)
[2016-07-27 17:45:46,351] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
    at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
    at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
    at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

查找緣由查找了半天,以前覺得是kafka_server_jaas.conf文件的格式有問題,改了以後發現 Kafka 有時啓動正常,有時不能正常啓動,修改以前 conf 文件爲:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_matt=「33"
    user_alice="alice";
};

最後分析多是由於沒有在 user 中配置 admin 帳戶,由於 broker 之間也開啓了身份認證,修改以後的配置文件以下。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice";
};

修改完以後,Kafka 就能夠正常運行了。

相關文章
相關標籤/搜索