Kafka ACL使用實戰

自0.9.0.0.版本引入Security以後,Kafka一直在完善security的功能。當前Kafka security主要包含3大功能:認證(authentication)、信道加密(encryption)和受權(authorization)。信道加密就是爲client到broker、broker到broker以及工具腳本與broker之間的數據傳輸配置SSL;認證機制主要是指配置SASL,而受權是經過ACL接口命令來完成的。html

生產環境中,用戶若要使用SASL則必須配置Kerberos,但對於一些小公司而言,他們的用戶系統並不複雜(特別是專門爲Kafka集羣服務的用戶可能不是不少),顯然使用Kerberos有些大材小用,並且因爲運行在內網環境,SSL加密也不是很必要。所以一個SASL+PLAINTEXT的集羣環境足以應付通常的使用場景。本文給出一個可運行的實例來演示一下如何在不使用Kerberos的狀況下配置SASL + ACL來構建secured Kafka集羣。java

在開始以前,咱們簡單學習下Kafka ACL的格式。根據官網的介紹,Kafka中一條ACL的格式以下:「Principal P is [Allowed/Denied] Operation O From Host H On Resource R」。它們的含義描述以下:apache

  • principal:表示一個Kafka user
  • operation:表示一個具體的操做類型,如WRITE, READ, DESCRIBE等。完整的操做列表詳見:http://docs.confluent.io/current/kafka/authorization.html#overview
  • Host:表示連向Kafka集羣的client的IP地址,若是是‘*’則表示全部IP。注意:當前Kafka不支持主機名,只能指定IP地址
  • Resource:表示一種Kafka資源類型。當前共有4種類型:TOPIC、CLUSTER、GROUP、TRANSACTIONID

下面我使用Kafka 0.11.0.0版原本演示下如何構建支持SASL + PLAINTEXT + ACL的Kafka集羣環境。bootstrap

1. Broker端配置

要配置SASL和ACL,咱們須要在broker端進行兩個方面的設置。首先是建立包含全部認證用戶信息的JAAS文件。本例中,咱們假設有3個用戶:admin, reader和writer,其中admin是管理員,reader用戶讀取Kafka集羣中topic數據,而writer用戶則負責向Kafka集羣寫入消息。咱們假設這3個用戶的密碼分別與用戶名相同(在實際場景中,管理員須要單獨把密碼發給各自的用戶),所以咱們能夠這樣編寫JAAS文件:安全

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin"
  user_admin="admin"
  user_reader="reader"
  user_writer="writer";
};工具

保存該文件爲kafka_cluster_jaas.conf(本例中的完整路徑是/Users/huxi/SourceCode/newenv/kafka_cluster_jaas.conf),以後咱們須要把該文件的完整路徑做爲一個JVM參數傳遞給Kafka的啓動腳本。不過因爲bin/kafka-server-start.sh只接收server.properties的位置,再也不接收其餘任何參數,故咱們須要修改該啓動腳本。具體作法以下:學習

$ pwd
/Users/huxi/SourceCode/newenv/kafka_0.11
$ cp bin/kafka-server-start.sh bin/secured-kafka-server-start.sh測試

$ vi bin/secured-kafka-server-start.shfetch

把該文件中的這行:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改成下面這行,而後保存退出
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/kafka_cluster_jaas.conf kafka.Kafka "$@"ui

作完上面的步驟後,咱們就在bin/目錄下新建了一個secured-kafka-server-start.sh啓動腳本。

配置好JAAS文件後,咱們開始修改broker啓動所需的server.properties文件,你至少須要配置(或修改)如下這些參數:

# 配置ACL入口類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 設置本例中admin爲超級用戶
super.users=User:admin

Okay,如今咱們能夠啓動broker了(當前確定要先啓動Zookeeper):

bin/secured-kafka-server-start.sh ../config_files/server.properties

.......

[2017-08-17 16:07:30,417] INFO [Kafka Server 0], started (kafka.server.KafkaServer) 

可見,Kafka broker已經成功啓動了。不過當前該broker只會接收已認證client發來的請求。下面咱們繼續clients端的配置。

2. Client端配置

咱們先來建立一個測試topic,名爲test,單分區,副本因子是1。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1
Created topic "test".

咦?稍等下,咱們不是啓用ACL了嗎?這裏爲何會建立成功呢?這裏我簡要說明一下: 首先咱們啓用ACL了嗎?固然!因爲咱們沒有顯式地設置allow.everyone.if.no.acl.found,故這個參數默認是false,也就是說對於目前的這個Kafka集羣而言,除了超級用戶以外的其餘任何用戶都沒法執行任何操做。其次,那爲何建立topic成功了呢?這是由於當前kafka-topics.sh腳本直接鏈接Zookeeper,故不受ACL的限制。因此不管是否配置了security,用戶老是可使用kafka-topics來管理topic。

言歸正傳,本例中咱們的目的是要測試:

  • 用戶writer向test topic寫入消息
  • 用戶reader從test topic讀取消息

下面咱們先來啓動一個console-consumer和一個console-producer來看下當前是個什麼情況:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello, kafka
[2017-08-17 16:16:09,626] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,685] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,740] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:09,799] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)

......

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
[2017-08-17 16:16:47,936] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:47,992] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,052] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:16:48,163] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)
......

看到了吧,當前的console producer/consumer都沒法工做,報的錯誤就是沒法鏈接broker(localhost:9092)。這就是由於咱們設置了security的緣故。下面咱們爲writer用戶配置安全,首先須要建立一個屬於writer用戶的JAAS文件,該文件中指定了writer用戶的credentials,以下所示:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="writer";
};

把上述內容保存到writer_jaas.conf文件(/Users/huxi/SourceCode/newenv/writer_jaas.conf)中。和broker相似,咱們須要拷貝一份新的bin/kafka-console-producer.sh將該JAAS文件做爲一個JVM參數傳給console producer:

$ cp bin/kafka-console-producer.sh bin/writer-kafka-console-producer.sh
$ vi bin/writer-kafka-console-producer.sh

把該文件中的這行:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改成下面這行,而後保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/writer_jaas.conf kafka.tools.ConsoleProducer "$@"

而後,咱們建立一個producer.config爲該console producer指定如下兩個屬性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

如今咱們使用新的腳原本啓動console producer:

$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
>hello, kafka
[2017-08-17 16:28:13,930] WARN Error while fetching metadata with correlation id 1 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,036] WARN Error while fetching metadata with correlation id 3 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,143] WARN Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:28:14,246] WARN Error while fetching metadata with correlation id 5 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

依然有錯誤,不過錯誤變成「沒法獲取元數據」了。這說明咱們運行的console producer經過了認證,可是沒有經過受權,所以咱們須要配置ACL來讓writer用戶有權限寫入topic:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test

再試producer:

$ bin/writer-kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
>hello
>hello, kafka
>message abc

......

 producer生產消息成功了。等等!子曰:沒有消費以前永遠不要斷言生產成功了!下面咱們就來配置下consumer,即用戶reader。

和writer用戶相似,咱們首先建立reader用戶的JAAS文件,保存在/Users/huxi/SourceCode/newenv/kafka_0.11/reader_jaas.conf中:

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader";
};

而後拷貝一份新的console consumer來指定上面的reader_jaas.conf:

$ cp bin/kafka-console-consumer.sh bin/reader-kafka-console-consumer.sh
$ vi bin/reader-kafka-console-consumer.sh

把該文件中的這行:
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
修改成下面這行,而後保存退出
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/Users/huxi/SourceCode/newenv/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

而後,咱們建立一個consumer.config爲該console producer指定如下3個屬性:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group

如今運行console consumer:

$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config consumer.config
[2017-08-17 16:37:54,124] WARN Error while fetching metadata with correlation id 2 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-08-17 16:37:54,127] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group

能夠看到此次出現了兩個錯誤:第一個問題依然是沒法獲取元數據——這代表reader用戶經過了認證但沒有經過受權;第二個問題代表reader用戶無權訪問consumer group——這一樣是受權的問題。咱們須要設置ACL來解決它們。首先,咱們爲reader用戶設置test topic的讀權限:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test

而後設置訪問group的權限:

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group

作完這些以後,咱們從新運行console consumer程序:

$ bin/reader-kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config consumer.config
hello
hello, kafka
message abc
.......

能夠看到,此次reader用戶成功地讀取了writer用戶生產的消息。這證實了writer和reader用戶均可以正常地工做了。

 

最後總結一下,這種方案適用於用戶數不多但又必須啓用安全的Kafka集羣,不過此方案比較麻煩的地方在於須要爲每一個腳本都從新定製,加上-Djava.security.auth.login.config參數以識別JAAS文件,不如Kerberos來得簡單。另外用戶徹底能夠根據官網的教程配置SSL,而後很輕易地把本文中的例子改爲SASL_SSL。

相關文章
相關標籤/搜索