本文主要介紹一下使用官方發佈的 Kafka 0.10.0 版如何實現 SASL/PLAIN 認證機制以及權限控制。html
Kafka 的安全機制主要分爲兩部分:java
In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster.
These features are considered to be of beta quality. The following security measures are currently supported:apache
- Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL (Kerberos). SASL/PLAIN can also be used from release 0.10.0.0 onwards.
- Authentication of connections from brokers to ZooKeeper
- Encryption of data transferred between brokers and clients, between brokers, or between brokers and tools using SSL (Note that there is a performance degradation when SSL is enabled, the magnitude of which depends on the CPU type and the JVM implementation.)
- Authorization of read / write operations by clients
- Authorization is pluggable and integration with external authorization services is supported
這段話的中文意思也就是說json
Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三種認證機制,關於這些認證機制的介紹能夠參考一下三篇文章。安全
能夠參考kafka使用SASL驗證,這個官方文檔的中文版。bash
須要在 Kafka 安裝目錄下的config/server.properties文件中配置如下信息服務器
在 kafka 安裝目錄下的config/server.properties
配置一下信息app
listeners=SASL_PLAINTEXT://ip:pot security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
還須要配置一個名 kafka_server_jaas.conf
的配置文件,將配置文件放置在conf
目錄下。 ide
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_alice="alice"; };
這裏,咱們配置了兩個用戶:admin 和 alice,密碼分別爲 admin 和 alice。
最後須要爲 Kafka 添加 java.security.auth.login.config
環境變量。在 bin/kafka-run-class.sh
中添加如下內容性能
KAFKA_SASL_OPTS='-Djava.security.auth.login.config=/opt/meituan/kafka_2.10-0.10.0.0/config/kafka_server_jaas.conf' # Launch mode if [ "x$DAEMON_MODE" = "xtrue" ]; then nohup $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null & else exec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_SASL_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi
注:實際上,咱們只是添加了第一行,並在第4和第6行中添加了 $KAFKA_SASL_OPTS 這個環境變量。
KafkaClient 配置
首先須要在客戶端配置 kafka_client_jaas.conf
文件
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice"; };
而後在(producer 和 consumer)程序中添加環境變量和配置,以下所示
System.setProperty("java.security.auth.login.config", ".../kafka_client_jaas.conf"); // 環境變量添加,須要輸入配置文件的路徑 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN");
配置完以上內容後,就能夠正常運行 producer 和 consumer 程序,若是帳戶密碼錯誤的話,程序就不能正常進行,可是不會有任何提示,這方面後面會進行一些改進。
這個小節介紹一下 Kafka 的 ACL 。
權限 | 說明 |
---|---|
READ | 讀取topic |
WRITE | 寫入topic |
DELETE | 刪除topic |
CREATE | 建立topic |
ALTER | 修改topic |
DESCRIBE | 獲取topic的信息 |
ClusterAction | |
ALL | 全部權限 |
訪問控制列表ACL存儲在zk上,路徑爲/kafka-acl
。
Kafka 提供的命令以下表所示
Option | Description | Default | Option type |
---|---|---|---|
–add | Indicates to the script that user is trying to add an acl. | Action | |
–remove | Indicates to the script that user is trying to remove an acl. | Action | |
–list | Indicates to the script that user is trying to list acts. | Action | |
–authorizer | Fully qualified class name of the authorizer. | kafka.security.auth.SimpleAclAuthorizer | Configuration |
–authorizer-properties | key=val pairs that will be passed to authorizer for initialization. For the default authorizer the example values are: zookeeper.connect=localhost:2181 | Configuration | |
–cluster | Specifies cluster as resource. | Resource | |
–topic [topic-name] | Specifies the topic as resource. | Resource | |
–group [group-name] | Specifies the consumer-group as resource. | Resource | |
–allow-principal | Principal is in PrincipalType:name format that will be added to ACL with Allowpermission. You can specify multiple –allow-principal in a single command. | Principal | |
–deny-principal | Principal is in PrincipalType:name format that will be added to ACL with Denypermission. You can specify multiple –deny-principal in a single command. | Principal | |
–allow-host | IP address from which principals listed in –allow-principal will have access. | if –allow-principal is specified defaults to * which translates to 「all hosts」 | Host |
–deny-host | IP address from which principals listed in –deny-principal will be denied access. | if –deny-principal is specified defaults to * which translates to 「all hosts」 | Host |
–operation | Operation that will be allowed or denied. Valid values are : Read, Write, Create, Delete, Alter, Describe, ClusterAction, All | All | Operation |
–producer | Convenience option to add/remove acls for producer role. This will generate acls that allows WRITE, DESCRIBE on topic and CREATE on cluster. | Convenience | |
–consumer | Convenience option to add/remove acls for consumer role. This will generate acls that allows READ, DESCRIBE on topic and READ on consumer-group. | Convenience |
經過幾個例子介紹一下如何進行權限設置。
# 爲用戶 alice 在 test(topic)上添加讀寫的權限 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --operation Read --operation Write --topic test # 對於 topic 爲 test 的消息隊列,拒絕來自 ip 爲198.51.100.3帳戶爲 BadBob 進行 read 操做,其餘用戶都容許 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic test # 爲bob 和 alice 添加all,以容許來自 ip 爲198.51.100.0或者198.51.100.1的讀寫請求 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:bob --allow-principal User:alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
# 列出 topic 爲 test 的全部權限帳戶 bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --list --topic test
輸出信息爲:
Current ACLs for resource `Topic:test`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Read from hosts: * User:alice has Allow permission for operations: Write from hosts: *
# 移除 acl bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
# producer bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --producer --topic test #consumer bin/kafka-acls.sh --authorizer-properties zookeeper.connect=data-rt-dev02:2181/kafka_test10 --add --allow-principal User:alice --consumer --topic test —group test-group
本小節記錄了在使用 SASL/PLAIN 時遇到的一些坑。
錯誤信息以下:
[2016-07-27 17:45:46,047] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-07-27 17:45:46,056] INFO [delete-topics-thread-1], Starting (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2016-07-27 17:45:46,057] DEBUG [Topic Deletion Manager 1], Waiting for signal to start or continue topic deletion (kafka.controller.TopicDeletionManager) [2016-07-27 17:45:46,351] WARN [Controller-1-to-broker-1-send-thread], Controller 1's connection to broker XXXX:9092 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread) java.io.IOException: Connection to XXXX:9092 (id: 1 rack: null) failed at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120) at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59) at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:181) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:180) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
查找緣由查找了半天,以前覺得是kafka_server_jaas.conf
文件的格式有問題,改了以後發現 Kafka 有時啓動正常,有時不能正常啓動,修改以前 conf 文件爲:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_matt=「33" user_alice="alice"; };
最後分析多是由於沒有在 user 中配置 admin 帳戶,由於 broker 之間也開啓了身份認證,修改以後的配置文件以下。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_alice="alice"; };
修改完以後,Kafka 就能夠正常運行了。