kafka SASL認證介紹及自定義SASL PLAIN認證功能

用戶認證功能,是一個成熟組件不可或缺的功能。在0.9版本之前kafka是沒有用戶認證模塊的(或者說只有SSL),好在kafka0.9版本之後逐漸發佈了多種用戶認證功能,彌補了這一缺陷(這裏僅介紹SASL)。java

本篇會先介紹當前kafka的四種認證方式,而後過一遍部署SASL/PLAIN認證功能的流程。最後再介紹如如何使用kafka2.x新推出的callback api,對SASL/PLAIN功能進行二次開發。python

kafka 2.x用戶認證方式小結

須要先明確的一點是,用戶認證和權限控制是兩碼事。用戶認證是確認這個用戶可否訪問當前的系統,而權限控制是控制用戶對當前系統中各類資源的訪問權限。用戶認證就是今天要講的內容,而kafka的權限控制,則是對應bin/kafka-acls.sh工具所提供的一系列功能,這裏不詳細展開。apache

標題特意說明kafka2.x是由於kafka2.0的時候推出一種新的用戶認證方式,SASL/OAUTHBEARER,在此前的版本是不存在這個東西的。那麼加上這個以後,kafka目前共有4種常見的認證方式。api

  • SASL/GSSAPI(kerberos):kafka0.9版本推出,即藉助kerberos實現用戶認證,若是公司剛好有kerberos環境,那麼用這個是比較合適的。
  • SASL/PLAIN:kafka0.10推出,很是簡單,簡單得有些雞肋,不建議生產環境使用,除非對這個功能二次開發,這也是我後面要講的。
  • SASL/SCRAM:kafka0.10推出,全名Salted Challenge Response Authentication Mechanism,爲解決SASL/PLAIN的不足而生,缺點多是某些客戶端並不支持這種方式認證登錄(使用比較複雜)。
  • SASL/OAUTHBEARER:kafka2.0推出,實現較爲複雜,目前業內應該較少實踐。

其實除了上述四種用戶認證功能以外,還有一個叫Delegation Token的東西。這個東西說一個輕量級的工具,是對現有SASL的一個補充,可以提升用戶認證的性能(主要針對Kerberos的認證方式)。算是比較高級的用法,通常也用不到,因此也不會多介紹,有興趣能夠看這裏Authentication using Delegation Tokens安全

SASL/GSSAPI框架

若是已經有kerberos的環境,那麼會比較適合使用這種方式,只須要讓管理員分配好principal和對應的keytab,而後在配置中添加對應的選項就能夠了。須要注意的是,通常採用這種方案的話,zookeeper也須要配置kerberos認證。jvm

SASL/PLAINide

這種方式其實就是一個用戶名/密碼的認證方式,不過它有不少缺陷,好比用戶名密碼是存儲在文件中,不能動態添加,明文等等!這些特性決定了它比較雞肋,但好處是足夠簡單,這使得咱們能夠方便地對它進行二次開發。本篇文章後續會介紹SASL/PLAIN的部署方式和二次開發的例子(基於kafka2.x)。工具

SASL/SCRAM性能

針對PLAIN方式的不足而提供的另外一種認證方式。這種方式的用戶名/密碼是存儲中zookeeper的,所以可以支持動態添加用戶。該種認證方式還會使用sha256或sha512對密碼加密,安全性相對會高一些。

並且配置起來和SASL/PLAIN差很少一樣簡單,添加用戶/密碼的命令官網也有提供,我的比較推薦使用這種方式。不過有些客戶端是不支持這個方式認證登錄的,好比python的kafka客戶端,這點須要提早調研好。

具體的部署方法官網或網上有不少,這裏很少介紹,貼下官網的Authentication using SASL/SCRAM

SASL/OAUTHBEARER

SASL/OAUTHBEARER是基於OAUTH2.0的一個新的認證框架,這裏先說下什麼是OAUTH吧,引用維基百科。

OAuth是一個開放標準,容許用戶讓第三方應用訪問該用戶在某一網站上存儲的私密的資源(如照片,視頻,聯繫人列表),而無需將用戶名和密碼提供給第三方應用。而 OAUTH2.0算是OAUTH的一個增強版。

說白了,SASL/OAUTHBEARER就是一套讓用戶使用第三方認證工具認證的標準,一般是須要本身實現一些token認證和建立的接口,因此會比較繁瑣。

詳情能夠經過這個kip瞭解KIP-255

說了這麼多,接下來就說實戰了,先介紹下如何配置SASL/PLAIN。

SASL/PLAIN實例(配置及客戶端)

broker配置

kafka_server_jaas.conf

這裏簡單介紹下SASL/PLAIN的部署方式,另外除了SASL/OAUTHBEARER,其餘幾種應該也是相似的部署方式,基本都是大同小異。

PS:本配置版本適用於kafka2.x,且無需配置zk認證

kafka的用戶認證,是基於java的jaas。因此咱們須要先添加jaas服務端的配置文件。在kafka_home/config/kafka_server_jaas.conf中添加如下配置信息:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";  
};

注意最後一個屬性後面須要加封號!配置是不難理解的,第一行指定PlainLoginModule,算是聲明這是一個SASL/PLAIN的認證類型,若是是其餘的,那麼就須要reqired其餘的類。usernamepassword則是用於集羣內部broker的認證用的。

這裏會讓人疑惑的,應該是user_adminuser_alice這兩個屬性了。這個實際上是用來定義用戶名和密碼的,形式是這樣:user_userName=password。因此這裏實際上是定義了用戶admin和用戶alice到密碼。

這一點能夠在源碼的PlainServerCallbackHandler類中找到對應的信息,kafka源碼中顯示,對用戶認證的時候,就會到jaas配置文件中,經過user_username屬性獲取對應username用戶的密碼,再進行校驗。固然這樣也致使了該配置文件只有重啓纔會生效,即沒法動態添加用戶。

說回來,寫完配置後,須要在kafka的配置中添加jaas文件的路徑。在kafka_home/bin/kafka-run-class.sh中,找到下面的配置,修改KAFKA_OPTS到配置信息。

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  KAFKA_OPTS=""
fi

將上述到KAFKA_OPTS修改成

KAFKA_OPTS="-Djava.security.auth.login.config=kafka_home/config/kafka_server_jaas.conf"

server.properties

而後修改kafka_home/config/server.properties

listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

其中SASL_PLAINTEXT的意思,是明文傳輸的意思,若是是SSL,那麼應該是SASL_SSL。

這樣就算是配置好kafka broker了,接下來啓動kafka,觀察輸出日誌,沒有錯誤通常就沒問題了。

客戶端配置

以producer爲例,只須要在kafka_home/config/producer.properties中添加jaas認證信息,以及用戶名密碼:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

security.protocol=SASL_SSL
sasl.mechanism=PLAIN

而後使用console producer驗證:

bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test --producer.config config/producer.properties

通常可以發送數據就說明部署完成了~

自定義SASL/PLAIN認證(二次開發)

前面小節介紹了kafka sasl_plain的部署方式,但這種方式的諸多弊病決定了它並不適合用於生產環境。這裏咱們先介紹kafka2的新認證接口,而後演示下如何使用新的api自定義。

kafka2新的callback接口介紹

這一api提出的背景,是由於最開始的api(即SaslServer),不方便對用戶認證進行拓展。這個問題在開發SASL/SCRAM功能的時候尤爲突出。按官方的說法,要添加SASL/SCRAM功能,須要重寫SaslServer類。

因此官方重寫了這方面的功能,使用回調的方式實現了這部分的功能模塊。使得開發者能夠方便得對用戶認證模塊進行拓展或修改。

而且新增長了四個自定義認證的配置,分別是:

  • sasl客戶端類:sasl.client.callback.handler.class
  • sasl服務端類:sasl.server.callback.handler.class
  • login類:sasl.login.class
  • login回調類:sasl.login.callback.handler.class

這幾個配置默認都是null,須要填寫的內容是自定義的類的路徑+名稱。咱們此次只須要關注sasl服務端類的配置,即sasl.server.callback.handler.class

這部分的內容具體是在KIP-86

自定義sasl/plain功能

先詳細介紹下sasl.server.callback.handler.class配置。這個配置在使用的時候,須要以小寫方式指定SASL的類型。舉個例子,若是是SASL_PLAINTEXT,那麼就須要這樣:

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.CustomPlainCallbackHandler

即以listener.name.sasl_plaintext.plain.sasl開頭。而後在kafka中,SASL_PLAINTEXT默認實現的callback handler是PlainServerCallbackHandler,實現了AuthenticateCallbackHandler接口。這個的邏輯其實還蠻簡單的,咱們能夠看看它重點的方法和代碼。

public class PlainServerCallbackHandler implements AuthenticateCallbackHandler {

    private static final String JAAS_USER_PREFIX = "user_";
    //jaas配置信息,初始化一次,這就是爲何plain沒法添加用戶
    private List<AppConfigurationEntry> jaasConfigEntries;

    @Override
    public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        this.jaasConfigEntries = jaasConfigEntries;
    }

    //核心類,獲取用戶密碼後,調用authenticate方法
    @Override
    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        String username = null;
        for (Callback callback: callbacks) {
            if (callback instanceof NameCallback)
                username = ((NameCallback) callback).getDefaultName();
            else if (callback instanceof PlainAuthenticateCallback) {
                PlainAuthenticateCallback plainCallback = (PlainAuthenticateCallback) callback;
                boolean authenticated = authenticate(username, plainCallback.password());
                plainCallback.authenticated(authenticated);
            } else
                throw new UnsupportedCallbackException(callback);
        }
    }

    //用戶密碼是經過獲取jaas文件的屬性,屬性名就是JAAS_USER_PREFIX變量當前綴+username
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            String expectedPassword = JaasContext.configEntryOption(jaasConfigEntries,
                    JAAS_USER_PREFIX + username,
                    PlainLoginModule.class.getName());
            return expectedPassword != null && Arrays.equals(password, expectedPassword.toCharArray());
        }
    }

    @Override
    public void close() throws KafkaException {
    }

}

前面說plain方式不支持動態添加用戶,user_username驗證密碼,看代碼就一清二楚。既然知道這個後,那要自定義校驗邏輯就很簡單了。

只須要繼承PlainServerCallbackHandler這個類,而後重寫authenticate方法實現本身的邏輯就實現自定義了。

好比我想讓用戶名和密碼相同的就驗證經過,那麼能夠這樣:

public class MyPlainServerCallbackHandler extends PlainServerCallbackHandler{
    @Override
    protected boolean authenticate(String username, char[] password) throws IOException {
        if (username == null)
            return false;
        else {
            return expectedPassword != null && Arrays.equals(password, username.toCharArray());
        }
    }
}

而後中server.properpose中添加server callback信息,就能夠了

listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=com.example.MyPlainServerCallbackHandler

對了,幾得從新編譯打包,替換掉kafka-client掉jar包,若是修改了一些全局信息(好比build.gradle引入新的依賴),那最好kafka全套jar包都換一下。

以上,若是以爲有用,不妨點個贊吧~

相關文章
相關標籤/搜索