用戶認證功能,是一個成熟組件不可或缺的功能。在0.9版本之前kafka是沒有用戶認證模塊的(或者說只有SSL),好在kafka0.9版本之後逐漸發佈了多種用戶認證功能,彌補了這一缺陷(這裏僅介紹SASL)。java
本篇會先介紹當前kafka的四種認證方式,而後過一遍部署SASL/PLAIN認證功能的流程。最後再介紹如如何使用kafka2.x新推出的callback api,對SASL/PLAIN功能進行二次開發。python
須要先明確的一點是,用戶認證和權限控制是兩碼事。用戶認證是確認這個用戶可否訪問當前的系統,而權限控制是控制用戶對當前系統中各類資源的訪問權限。用戶認證就是今天要講的內容,而kafka的權限控制,則是對應bin/kafka-acls.sh
工具所提供的一系列功能,這裏不詳細展開。apache
標題特意說明kafka2.x是由於kafka2.0的時候推出一種新的用戶認證方式,SASL/OAUTHBEARER,在此前的版本是不存在這個東西的。那麼加上這個以後,kafka目前共有4種常見的認證方式。api
其實除了上述四種用戶認證功能以外,還有一個叫Delegation Token的東西。這個東西說一個輕量級的工具,是對現有SASL的一個補充,可以提升用戶認證的性能(主要針對Kerberos的認證方式)。算是比較高級的用法,通常也用不到,因此也不會多介紹,有興趣能夠看這裏Authentication using Delegation Tokens。安全
SASL/GSSAPI框架
若是已經有kerberos的環境,那麼會比較適合使用這種方式,只須要讓管理員分配好principal和對應的keytab,而後在配置中添加對應的選項就能夠了。須要注意的是,通常採用這種方案的話,zookeeper也須要配置kerberos認證。jvm
SASL/PLAINide
這種方式其實就是一個用戶名/密碼的認證方式,不過它有不少缺陷,好比用戶名密碼是存儲在文件中,不能動態添加,明文等等!這些特性決定了它比較雞肋,但好處是足夠簡單,這使得咱們能夠方便地對它進行二次開發。本篇文章後續會介紹SASL/PLAIN的部署方式和二次開發的例子(基於kafka2.x)。工具
SASL/SCRAM性能
針對PLAIN方式的不足而提供的另外一種認證方式。這種方式的用戶名/密碼是存儲中zookeeper的,所以可以支持動態添加用戶。該種認證方式還會使用sha256或sha512對密碼加密,安全性相對會高一些。
並且配置起來和SASL/PLAIN差很少一樣簡單,添加用戶/密碼的命令官網也有提供,我的比較推薦使用這種方式。不過有些客戶端是不支持這個方式認證登錄的,好比python的kafka客戶端,這點須要提早調研好。
具體的部署方法官網或網上有不少,這裏很少介紹,貼下官網的Authentication using SASL/SCRAM。
SASL/OAUTHBEARER
SASL/OAUTHBEARER是基於OAUTH2.0的一個新的認證框架,這裏先說下什麼是OAUTH吧,引用維基百科。
OAuth是一個開放標準,容許用戶讓第三方應用訪問該用戶在某一網站上存儲的私密的資源(如照片,視頻,聯繫人列表),而無需將用戶名和密碼提供給第三方應用。而 OAUTH2.0算是OAUTH的一個增強版。
說白了,SASL/OAUTHBEARER就是一套讓用戶使用第三方認證工具認證的標準,一般是須要本身實現一些token認證和建立的接口,因此會比較繁瑣。
詳情能夠經過這個kip瞭解KIP-255
說了這麼多,接下來就說實戰了,先介紹下如何配置SASL/PLAIN。
kafka_server_jaas.conf
這裏簡單介紹下SASL/PLAIN的部署方式,另外除了SASL/OAUTHBEARER,其餘幾種應該也是相似的部署方式,基本都是大同小異。
PS:本配置版本適用於kafka2.x,且無需配置zk認證
kafka的用戶認證,是基於java的jaas。因此咱們須要先添加jaas服務端的配置文件。在kafka_home/config/kafka_server_jaas.conf
中添加如下配置信息:
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; };
注意最後一個屬性後面須要加封號!配置是不難理解的,第一行指定PlainLoginModule
,算是聲明這是一個SASL/PLAIN的認證類型,若是是其餘的,那麼就須要reqired其餘的類。username
和password
則是用於集羣內部broker的認證用的。
這裏會讓人疑惑的,應該是user_admin
和user_alice
這兩個屬性了。這個實際上是用來定義用戶名和密碼的,形式是這樣:user_userName=password。因此這裏實際上是定義了用戶admin和用戶alice到密碼。
這一點能夠在源碼的PlainServerCallbackHandler
類中找到對應的信息,kafka源碼中顯示,對用戶認證的時候,就會到jaas配置文件中,經過user_username屬性獲取對應username用戶的密碼,再進行校驗。固然這樣也致使了該配置文件只有重啓纔會生效,即沒法動態添加用戶。
說回來,寫完配置後,須要在kafka的配置中添加jaas文件的路徑。在kafka_home/bin/kafka-run-class.sh
中,找到下面的配置,修改KAFKA_OPTS到配置信息。
# Generic jvm settings you want to add if [ -z "$KAFKA_OPTS" ]; then KAFKA_OPTS="" fi
將上述到KAFKA_OPTS修改成
KAFKA_OPTS="-Djava.security.auth.login.config=kafka_home/config/kafka_server_jaas.conf"
server.properties
而後修改kafka_home/config/server.properties
:
listeners=SASL_PLAINTEXT://host.name:port security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN
其中SASL_PLAINTEXT的意思,是明文傳輸的意思,若是是SSL,那麼應該是SASL_SSL。
這樣就算是配置好kafka broker了,接下來啓動kafka,觀察輸出日誌,沒有錯誤通常就沒問題了。
以producer爲例,只須要在kafka_home/config/producer.properties
中添加jaas認證信息,以及用戶名密碼:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="alice" \ password="alice-secret"; security.protocol=SASL_SSL sasl.mechanism=PLAIN
而後使用console producer驗證:
bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test --producer.config config/producer.properties
通常可以發送數據就說明部署完成了~
前面小節介紹了kafka sasl_plain的部署方式,但這種方式的諸多弊病決定了它並不適合用於生產環境。這裏咱們先介紹kafka2的新認證接口,而後演示下如何使用新的api自定義。
這一api提出的背景,是由於最開始的api(即SaslServer
),不方便對用戶認證進行拓展。這個問題在開發SASL/SCRAM功能的時候尤爲突出。按官方的說法,要添加SASL/SCRAM功能,須要重寫SaslServer
類。
因此官方重寫了這方面的功能,使用回調的方式實現了這部分的功能模塊。使得開發者能夠方便得對用戶認證模塊進行拓展或修改。
而且新增長了四個自定義認證的配置,分別是:
這幾個配置默認都是null,須要填寫的內容是自定義的類的路徑+名稱。咱們此次只須要關注sasl服務端類的配置,即sasl.server.callback.handler.class
。
這部分的內容具體是在KIP-86。
先詳細介紹下sasl.server.callback.handler.class
配置。這個配置在使用的時候,須要以小寫方式指定SASL的類型。舉個例子,若是是SASL_PLAINTEXT,那麼就須要這樣:
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler
即以listener.name.sasl_plaintext.plain.sasl開頭。而後在kafka中,SASL_PLAINTEXT默認實現的callback handler是PlainServerCallbackHandler
,實現了AuthenticateCallbackHandler
接口。這個的邏輯其實還蠻簡單的,咱們能夠看看它重點的方法和代碼。
public class PlainServerCallbackHandler implements AuthenticateCallbackHandler { private static final String JAAS_USER_PREFIX = "user_"; //jaas配置信息,初始化一次,這就是爲何plain沒法添加用戶 private List<AppConfigurationEntry> jaasConfigEntries; @Override public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { this.jaasConfigEntries = jaasConfigEntries; } //核心類,獲取用戶密碼後,調用authenticate方法 @Override public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { String username = null; for (Callback callback: callbacks) { if (callback instanceof NameCallback) username = ((NameCallback) callback).getDefaultName(); else if (callback instanceof PlainAuthenticateCallback) { PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback; boolean authenticated = authenticate(username, plainCallback.password()); plainCallback.authenticated(authenticated); } else throw new UnsupportedCallbackException(callback); } } //用戶密碼是經過獲取jaas文件的屬性,屬性名就是JAAS_USER_PREFIX變量當前綴+username protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries, JAAS_USER_PREFIX + username, PlainLoginModule.class.getName()); return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray()); } } @Override public void close() throws KafkaException { } }
前面說plain方式不支持動態添加用戶,user_username驗證密碼,看代碼就一清二楚。既然知道這個後,那要自定義校驗邏輯就很簡單了。
只須要繼承PlainServerCallbackHandler
這個類,而後重寫authenticate
方法實現本身的邏輯就實現自定義了。
好比我想讓用戶名和密碼相同的就驗證經過,那麼能夠這樣:
public class MyPlainServerCallbackHandler extends PlainServerCallbackHandler{ @Override protected boolean authenticate(String username, char[] password) throws IOException { if (username == null) return false; else { return expectedPassword != null && Arrays.equals(password, username.toCharArray()); } } }
而後中server.properpose中添加server callback信息,就能夠了
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.MyPlainServerCallbackHandler
對了,幾得從新編譯打包,替換掉kafka-client掉jar包,若是修改了一些全局信息(好比build.gradle引入新的依賴),那最好kafka全套jar包都換一下。
以上,若是以爲有用,不妨點個贊吧~