Pulsar學習筆記之 Authentication認證機制與插件開發

Pulsar自帶的Authentication認證方式有不少種:TLS/Basic/JWT Token/Athenz/Sasl,可是均存在安全性或複雜性的一些問題,且有時候咱們須要和已有的帳戶系統作集成,以保持一致的產品體驗,此時須要自行開發認證插件。這裏介紹一個使用簽名機制加強安全性的認證插件開發方案。數據庫

Pulsar認證的擴展機制

Pulsar認證提供了比較好的擴展機制,經過實現幾個預約義的接口類,就能夠方便的插入本身開發的認證明現。這些接口主要包括如下4個:apache

#客戶端包裝認證參數
org.apache.pulsar.client.api.Authentication
org.apache.pulsar.client.api.AuthenticationDataProvider

#服務端驗證認證參數
org.apache.pulsar.broker.authentication.AuthenticationProvider
org.apache.pulsar.broker.authentication.AuthenticationState

此外默認的多種認證方式的代碼能夠提供很是豐富的認證明現參考。api

待集成的認證服務

假設咱們已經有了一個帳號系統,裏面存儲有帳號名(accessKey)、加鹽hash過的密碼(accessSecret)等認證須要的信息;咱們須要再開發一個專用的接口,供咱們實現的Pulsar認證插件調用。爲避免認證接口被他人濫用,經過Header中的Auth參數作簡單比對校驗。安全

接口參數

  • 接收POST參數:accessKey,timeStr,paramStr,signature
  • 接收Header參數:Auth

認證大體過程

  • 對收到的accessKey作格式校驗
  • 請求帳號系統接口或查詢數據庫獲取accessSecret
  • 用跟客戶端相同的方式從新計算簽名
  • sginStr:accessKey+timeStr+paramStr
  • signature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))
  • 比對簽名,失敗報錯
  • 構造userRole

認證經過後響應結果

{
    "uid":"123456",
    "userRole": "ur-123456", #命名規則能夠自行以爲定
}

Pulsar認證插件的實現要點

客戶端包裝認證參數

  • 傳入參數:accessKey,accessSecret
  • 生成參數ide

    • paramStr:method=akauth&client=$UUID&rand=Math.rand()
    • timeStr:String.valueOf(System.currentTimeMillis()/1000)
    • sginStr:accessKey+timeStr+paramStr
  • 簽名計算:signature=base64(HmacSHA1.init(accessSecret).doFinal(signStr))
  • 傳遞參數:accessKey,timeStr,paramStr,signature

服務端轉發認證參數

  • 接收參數:accessKey,timeStr,paramStr,signature
  • 請求 認證服務接口

服務端簽名存活期校驗

  • AuthenticationProviderTabaltAK.authenticate 判斷是否過時
// check auth params survival time
long currentTimeSeconds = System.currentTimeMillis() / 1000;
long authTimeSeconds = Long.parseLong(authParams.getTimeStr());
if ((authTimeSeconds + this.akSignatureSurvivalSeconds) < currentTimeSeconds) {
    throw new AuthenticationException("auth out of survival time");
}
  • TokenAuthenticationState.isExpired 判斷是否過時ui

    • 具體判斷邏輯與上面相同

Pulsar認證插件的使用配置

Pulsar服務端配置

服務端一般在Broker和Proxy的配置文件中配置this

# 配置Broker的認證插件和參數
authenticationEnabled=true
authenticationProviders=net.tabalt.pulsar.broker.authentication.AuthenticationProviderTabaltAK
tabaltAKServerUrl=http://127.0.0.1/ak  #AK服務地址
tabaltAKServerAuth=test-auth  #AK服務接口認證Token,經過header傳遞給認證服務
tabaltAKSignatureSurvivalSeconds=3600 #簽名存活秒數

# 配置Broker做爲客戶端請求其餘Broker的認證插件和參數,此處須要配置一個超級帳號
brokerClientAuthenticationPlugin=net.tabalt.pulsar.client.auth.AuthenticationTabaltAK
brokerClientAuthenticationParameters={"accessKey":"test_access_key","accessSecret":"test_access_secret"}

Pulsar客戶端配置

AuthenticationTabaltAK authAK = new AuthenticationTabaltAK("test_access_key", "test_access_secret");
PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://127.0.0.1:6650")
                .authentication(authAK).build();
Producer<byte[]> producer = client.newProducer().topic("my-topic").create();
...
相關文章
相關標籤/搜索