Pulsar自帶的Authentication認證方式有不少種:TLS/Basic/JWT Token/Athenz/Sasl,可是均存在安全性或複雜性的一些問題,且有時候咱們須要和已有的帳戶系統作集成,以保持一致的產品體驗,此時須要自行開發認證插件。這裏介紹一個使用簽名機制加強安全性的認證插件開發方案。數據庫
Pulsar認證提供了比較好的擴展機制,經過實現幾個預約義的接口類,就能夠方便的插入本身開發的認證明現。這些接口主要包括如下4個:apache
#客戶端包裝認證參數 org.apache.pulsar.client.api.Authentication org.apache.pulsar.client.api.AuthenticationDataProvider #服務端驗證認證參數 org.apache.pulsar.broker.authentication.AuthenticationProvider org.apache.pulsar.broker.authentication.AuthenticationState
此外默認的多種認證方式的代碼能夠提供很是豐富的認證明現參考。api
假設咱們已經有了一個帳號系統,裏面存儲有帳號名(accessKey)、加鹽hash過的密碼(accessSecret)等認證須要的信息;咱們須要再開發一個專用的接口,供咱們實現的Pulsar認證插件調用。爲避免認證接口被他人濫用,經過Header中的Auth參數作簡單比對校驗。安全
{ "uid":"123456", "userRole": "ur-123456", #命名規則能夠自行以爲定 }
生成參數ide
// check auth params survival time long currentTimeSeconds = System.currentTimeMillis() / 1000; long authTimeSeconds = Long.parseLong(authParams.getTimeStr()); if ((authTimeSeconds + this.akSignatureSurvivalSeconds) < currentTimeSeconds) { throw new AuthenticationException("auth out of survival time"); }
TokenAuthenticationState.isExpired 判斷是否過時ui
服務端一般在Broker和Proxy的配置文件中配置this
# 配置Broker的認證插件和參數 authenticationEnabled=true authenticationProviders=net.tabalt.pulsar.broker.authentication.AuthenticationProviderTabaltAK tabaltAKServerUrl=http://127.0.0.1/ak #AK服務地址 tabaltAKServerAuth=test-auth #AK服務接口認證Token,經過header傳遞給認證服務 tabaltAKSignatureSurvivalSeconds=3600 #簽名存活秒數 # 配置Broker做爲客戶端請求其餘Broker的認證插件和參數,此處須要配置一個超級帳號 brokerClientAuthenticationPlugin=net.tabalt.pulsar.client.auth.AuthenticationTabaltAK brokerClientAuthenticationParameters={"accessKey":"test_access_key","accessSecret":"test_access_secret"}
AuthenticationTabaltAK authAK = new AuthenticationTabaltAK("test_access_key", "test_access_secret"); PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://127.0.0.1:6650") .authentication(authAK).build(); Producer<byte[]> producer = client.newProducer().topic("my-topic").create(); ...