以前寫過一篇Kafka ACL使用實戰,裏面演示瞭如何配置SASL PLAINTEXT + ACL來爲Kafka集羣提供認證/權限安全保障,但有一個問題常常被問到:這種方案下是否支持動態增長/移除認證用戶——這裏給出明確的答案:不能夠!由於全部認證用戶信息所有配置在靜態的jaas文件中,故沒法在不重啓broker的狀況下實現用戶增減。這一次我以Kafka 2.1.0版本爲例演示配置SASL SCRAM + ACL來實現如何動態增減用戶,另外也想完善和優化上一篇中的一些不足之處(好比說不用再修改初始的.sh腳本,改用環境變量的方式來使設置生效)。html
1. 環境準備java
Kafka服務器:一臺雲主機,4 core,8GB RAM,1Gbps帶寬node
Kafka客戶端:另外一臺雲主機ios
客戶端與服務器經過內網交互算法
2. 集羣拓撲apache
啓動兩臺Kafka服務器,因爲我只在一臺雲主機上演示,故上面啓動兩個broker實例。客戶端這邊使用console-producer和console-consumer腳原本模擬客戶端程序。bootstrap
3. 建立用戶安全
咱們使用kafka-configs.sh來建立用戶,Kafka的SCRAM實現機制會把用戶認證信息保存在Zookeeper中。假設我要建立3個用戶admin, writer, reader分別用於實現Kafka broker間通信、生產消息和消費消息。下面咱們開始具體的配置:首先啓動Zookeeper,但不要啓動Kafka broker,ZK啓動成功後執行如下命令去建立3個用戶:服務器
建立writer用戶,密碼是writer-pwd: 測試
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer-pwd],SCRAM-SHA-512=[password=writer-pwd]' --entity-type users --entity-name writer
Completed Updating config for entity: user-principal 'writer'.
建立reader用戶,密碼是reader-pwd:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader-pwd],SCRAM-SHA-512=[password=reader-pwd]' --entity-type users --entity-name reader
Completed Updating config for entity: user-principal 'reader'.
建立admin用戶,密碼是admin:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin],SCRAM-SHA-512=[password=admin]' --entity-type users --entity-name admin
Completed Updating config for entity: user-principal 'admin'.
3個測試用戶都建立好了,下面咱們使用kafka-configs.sh查看一下writer用戶的信息:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --describe --entity-type users --entity-name writer
Configs for user-principal 'writer' are SCRAM-SHA-512=salt=dTlvNzl4Y3BvZ3BuMmx5ODY0aWlzN2RsZg==,stored_key=Yc02SwxDkAKDQH01W98bkJLJcVO24q9vR5tS0nWaq5Jg2Z7DtzwrOt6J2Cr8Oib+dHq7TUIeG+NLiCAMnRlfVg==,server_key=Tu+iiosvJrDemOvjaDdzrh2GhLRg6r9zoTRDdvXZCMA7n7+D8DYsUz6Gnugcczsnz5Ut/jkkklEOXYRXIqOLCg==,iterations=4096,SCRAM-SHA-256=salt=Y2dpcnB4aTU5NWNwMDZjNmFvbHluMWJpOQ==,stored_key=GGMhtO1PhxZFpEHOaDiqA4AM16Ma19nky1UV/gFoC1s=,server_key=L0R1xkcULaWcGMu6TdtWi5mf5lu1VTS8imWvKPlM3i4=,iterations=8192
裏面包含了writer用戶加密算法SCRAM-SHA-256以及SCRAM-SHA-512對應的鹽值(salt)、ServerKey和StoreKey等,總之都是SCRAM機制的術語了。
4. Broker端配置
和SASL PLAINTEXT同樣,咱們依然須要爲每一個broker建立一個對應的jaas文件。注:因爲本例中個人兩個broker實例都是在同一臺雲主機上啓動的,故我只建立一份jaas文件便可。實際使用中須要爲每臺單獨的物理broker機器建立一份jaas文件。
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin";
};
將上面內容保存成kafka-broker-jaas.conf文件。注意末尾的兩個分號,另外不要任何空白鍵。這裏配置admin用戶用於實現broker間的通信。接下來是配置broker端的server.properties,配置項以下:
# 啓用ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# 設置本例中admin爲超級用戶
super.users=User:admin# 啓用SCRAM機制,採用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-512
# 爲broker間通信開啓SCRAM機制,採用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# broker間通信使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://172.21.0.9:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://172.21.0.9:9092
另外一臺broker的配置和它基本相似,只是要使用不一樣的端口(好比9093)、broker.id和log.dirs。如今分別啓動兩個broker實例,若是一切配置正常,這兩個broker實例應該可以正常啓動——注意引入jaas文件的方式,將-Djava.security.auth.login.config做爲KAKFA_OPTS環境變量的方式進行設置。
$ KAFKA_OPTS=-Djava.security.auth.login.config=/xfs/bigdata/kafka_2.12-2.1.0/kafka-broker-jaas.conf bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server1.properties
......[2019-02-05 17:12:08,365] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-01-05 17:12:08,365] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:12:08,367] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
$ KAFKA_OPTS=-Djava.security.auth.login.config=/xfs/bigdata/kafka_2.12-2.1.0/kafka-broker-jaas.conf bin/kafka-server-start.sh /xfs/bigdata/kafka_2.12-2.1.0/config/server2.properties
......[2019-02-05 17:22:12,970] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:22:12,970] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
[2019-02-05 17:22:12,971] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
如今建立測試topic,本例只建立一個單分區單副本的topic便可:
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.9:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".
5. Client端配置
Okay,一切準備就緒了。咱們先來使用console-producer腳原本嘗試發送消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test
>hello, world
[2019-02-05 18:17:19,005] ERROR Error when sending message to topic test with key: null, value: 12 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
消息發送失敗了,緣由是沒有指定合法的認證用戶,如今我改用writer用戶發送——爲此我須要建立一個名爲producer.conf的配置文件給producer程序使用,其內容以下:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="writer" password="writer-pwd";
以後運行console-producer腳本:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello
[2019-02-05 18:25:40,272] WARN [Producer clientId=console-producer] Bootstrap broker 172.21.0.9:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
異常發生變化了,如今報的是「沒法建立鏈接」的錯誤,這是由於writer用戶沒有對test topic的寫權限所致,故須要給writer用戶增長該topic的寫權限:
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:writer --operation Write --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
再次執行console-producer腳本:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello
>Kafka
發送成功!
下面是配置consumer程序,和producer同樣,爲reader用戶建立consumer.conf,同時設置對topic的讀權限:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="reader" password="reader-pwd";
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --topic test
Adding ACLs for resource `Topic:LITERAL:test`:
User:reader has Allow permission for operations: Read from hosts: *Current ACLs for resource `Topic:LITERAL:test`:
User:writer has Allow permission for operations: Write from hosts: *
User:reader has Allow permission for operations: Read from hosts: *
執行console-consumer腳本:
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --from-beginning --consumer.config /opt/data/kafka2-2.1.0/consumer.conf --group test-group
[2019-02-05 18:55:57,272] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
報錯提示reader用戶沒有訪問consumer group的權限,加之:
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *Current ACLs for resource `Group:LITERAL:test-group`:
User:reader has Allow permission for operations: Read from hosts: *
再次執行console-consumer腳本:
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.9:9092,172.21.0.9:9093 --topic test --from-beginning --consumer.config /opt/data/kafka_2.12-2.1.0/consumer.conf --group test-group
hello
Kafka
6. 動態增長/刪除用戶
如今咱們在不重啓broker的狀況下增長新用戶writer1和reader1,分別爲它們賦予test topic的寫權限和讀權限:
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=writer1-pwd],SCRAM-SHA-512=[password=writer1-pwd]' --entity-type users --entity-name writer1
$ bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --add-config 'SCRAM-SHA-256=[password=reader1-pwd],SCRAM-SHA-512=[password=reader1-pwd]' --entity-type users --entity-name reader1
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:writer1 --operation Write --topic test$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --topic test
$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.21.0.9:2181 --add --allow-principal User:reader1 --operation Read --group test-group1
同時刪除原來的用戶writer:
bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name writer
bin/kafka-configs.sh --zookeeper 172.21.0.9:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name writer
如今檢驗writer用戶不能寫入消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello by writer
[2019-02-06 09:30:54,492] ERROR [Producer clientId=console-producer] Connection to node -2 (172.21.0.9/172.21.0.9:9093) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
[2019-02-06 09:30:54,492] ERROR Error when sending message to topic test with key: null, value: 15 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512
[2019-02-06 09:30:54,493] ERROR [Producer clientId=console-producer] Connection to node -1 (172.21.0.9/172.21.0.9:9092) failed authentication due to: Authentication failed due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)
最後修改producer.conf中的writer爲writer1,驗證writer1用戶有權限生產消息:
$ bin/kafka-console-producer.sh --broker-list 172.21.0.9:9092,172.21.0.9:9093 --topic test --producer.config /opt/data/kafka_2.12-2.1.0/producer.conf
>hello by writer1
>successful
>
至此,一個支持動態增長/刪除用戶的Kafka安全配置就作好了。