Kafka是由LinkedIn設計的一個高吞吐量、分佈式、基於發佈訂閱模式的消息系統,使用Scala編寫,它以可水平擴展、可靠性、異步通訊和高吞吐率等特性而被普遍使用。目前愈來愈多的開源分佈式處理系統都支持與Kafka集成,其中Spark Streaming做爲後端流引擎配合Kafka做爲前端消息系統正成爲當前流處理系統的主流架構之一。前端
然而,當下愈來愈多的安全漏洞、數據泄露等問題的爆發,安全正成爲系統選型不得不考慮的問題,Kafka因爲其安全機制的匱乏,也致使其在數據敏感行業的部署存在嚴重的安全隱患。本文將圍繞Kafka,先介紹其總體架構和關鍵概念,再深刻分析其架構之中存在的安全問題,最後分享下Transwarp在Kafka安全性上所作的工做及其使用方法。java
Kafka架構與安全後端
首先,咱們來了解下有關Kafka的幾個基本概念:api
Topic:Kafka把接收的消息按種類劃分,每一個種類都稱之爲Topic,由惟一的Topic Name標識。安全
Producer:向Topic發佈消息的進程稱爲Producer。服務器
Consumer:從Topic訂閱消息的進程稱爲Consumer。網絡
Broker:Kafka集羣包含一個或多個服務器,這種服務器被稱爲Broker。架構
Kafka的總體架構以下圖所示,典型的Kafka集羣包含一組發佈消息的Producer,一組管理Topic的Broker,和一組訂閱消息的Consumer。Topic能夠有多個分區,每一個分區只存儲於一個Broker。Producer能夠按照必定的策略將消息劃分給指定的分區,如簡單的輪詢各個分區或者按照特定字段的Hash值指定分區。Broker須要經過ZooKeeper記錄集羣的全部Broker、選舉分區的Leader,記錄Consumer的消費消息的偏移量,以及在Consumer Group發生變化時進行relalance. Broker接收和發送消息是被動的:由Producer主動發送消息,Consumer主動拉取消息。併發
然而,分析Kafka框架,咱們會發現如下嚴重的安全問題:框架
1.網絡中的任何一臺主機,均可以經過啓動Broker進程而加入Kafka集羣,可以接收Producer的消息,可以篡改消息併發送給Consumer。
2.網絡中的任何一臺主機,均可以啓動惡意的Producer/Consumer鏈接到Broker,發送非法消息或拉取隱私消息數據。
3.Broker不支持鏈接到啓用Kerberos認證的ZooKeeper集羣,沒有對存放在ZooKeeper上的數據設置權限。任意用戶都可以直接訪問ZooKeeper集羣,對這些數據進行修改或刪除。
4.Kafka中的Topic不支持設置訪問控制列表,任意鏈接到Kafka集羣的Consumer(或Producer)都能對任意Topic讀取(或發送)消息。
隨着Kafka應用場景愈來愈普遍,特別是一些數據隱私程度較高的領域(如道路交通的視頻監控),上述安全問題的存在猶如一顆定時炸彈,一旦內網被黑客入侵或者內部出現惡意用戶,全部的隱私數據(如車輛出行記錄)都可以輕易地被竊取,而無需攻破Broker所在的服務器。
Kafka安全設計
基於上述分析,Transwarp從如下兩個方面加強Kafka的安全性:
身份認證(Authentication):設計並實現了基於Kerberos和基於IP的兩種身份認證機制。前者爲強身份認證,相比於後者具備更好的安全性,後者適用於IP地址可信的網絡環境,相比於前者部署更爲簡便。
權限控制(Authorization):設計並實現了Topic級別的權限模型。Topic的權限分爲READ(從Topic拉取數據)、WRITE(向Topic中生產數據)、CREATE(建立Topic)和DELETE(刪除Topic)。
基於Kerberos的身份機制以下圖所示:
Broker啓動時,須要使用配置文件中的身份和密鑰文件向KDC(Kerberos服務器)認證,認證經過則加入Kafka集羣,不然報錯退出。
Producer(或Consumer)啓動後須要通過以下步驟與Broker創建安全的Socket鏈接:
1.Producer向KDC認證身份,經過則獲得TGT(票證請求票證),不然報錯退出
2.Producer使用TGT向KDC請求Kafka服務,KDC驗證TGT並向Producer返回SessionKey(會話密鑰)和ServiceTicket(服務票證)
3.Producer使用SessionKey和ServiceTicket與Broker創建鏈接,Broker使用自身的密鑰解密ServiceTicket,得到與Producer通訊的SessionKey,而後使用SessionKey驗證Producer的身份,經過則創建鏈接,不然拒絕鏈接。
ZooKeeper須要啓用Kerberos認證模式,保證Broker或Consumer與其的鏈接是安全的。
Topic的訪問控制列表(ACL)存儲於ZooKeeper中,存儲節點的路徑爲/acl/<topic>/<user>,節點數據爲R(ead)、W(rite)、C(reate)、D(elete)權限的集合,如/acl/transaction/jack節點的數據爲RW,則表示用戶jack可以對transaction這個topic進行讀和寫。
另外,kafka爲特權用戶,只有kafka用戶可以賦予/取消權限。所以,ACL相關的ZooKeeper節點權限爲kafka具備全部權限,其餘用戶不具備任何權限。
構建安全的Kafka服務
首先,咱們爲Broker啓用Kerberos認證模式,配置文件爲/etc/kafka/conf/server.properties,安全相關的參數以下所示:
其中,authentication參數表示認證模式,可選配置項爲simple, kerberos和ipaddress,默認爲simple。當認證模式爲kerberos時,須要額外配置帳戶屬性principal和對應的密鑰文件路徑keytab.
認證模式爲ipaddress時,Producer和Consumer建立時不須要作任何改變。而認證模式爲kerberos時,須要預先建立好相應的principal和keytab,並使用API進行登陸,樣例代碼以下所示:
public class SecureProducer extends Thread {
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();public SecureProducer(String topic) {
AuthenticationManager.setAuthMethod(「kerberos」);
AuthenticationManager.login(「producer1」, 「/etc/producer1.keytab」);
props.put(「serializer.class」, 「kafka.serializer.StringEncoder」);
props.put(「metadata.broker.list」,「172.16.1.190:9092,172.16.1.192:9092,172.16.1.193:9092」);
// Use random partitioner. Don’t need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}. . .
Topic權限管理
Topic的權限管理主要是經過AuthorizationManager這個類來完成的,其類結構以下圖所示:
其中,resetPermission(user, Permissions, topic) 爲重置user對topic的權限。
grant(user, Permissions, topic) 爲賦予user對topic權限。
revoke(user, Permissions, topic) 爲取消user對topic權限。
isPermitted(user, Permissions, topic) 爲檢查user對topic是否具備指定權限。
調用grant或revoke進行權限設置完成後,須要commit命令提交修改到ZooKeeper
Kerberos模式下,AuthorizationManager須要先使用AuthenticationManager.login方法登陸,與ZooKeeper創建安全的鏈接,再進行權限設置。示例代碼以下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(「authentication」, 「kerberos」);
props.setProperty(「zookeeper.connect」, 「172.16.2.116:2181,172.16.2.117:2181,172.16.2.118:2181」);
props.setProperty(「principal」, 「kafka/host1@TDH」);
props.setProperty(「keytab」, 「/usr/lib/kafka/config/kafka.keytab」);
ZKConfig config = new ZKConfig(props);
AuthenticationManager.setAuthMethod(config.authentication());
AuthenticationManager.login(config.principal(), config.keytab());
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(「172.16.1.87」,
new Permissions(Permissions.READ, Permissions.WRITE), 「test」);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(「172.16.1.87」, new Permissions(Permissions.CREATE), 「test」);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(「172.16.1.87」, new Permissions(Permissions.READ), 「test」);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
ipaddress認證模式下,取消和賦予權限的操做以下所示:
public class AuthzTest {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty(「authentication」, 「ipaddress」);
props.setProperty(「zookeeper.connect」,
「172.16.1.87:2181,172.16.1.88:2181,172.16.1.89:2181」);
ZKConfig config = new ZKConfig(props);
// new authorization manager
AuthorizationManager authzManager = new AuthorizationManager(config);
// reset permission READ and WRITE to ip 172.16.1.87 on topic test
authzManager.resetPermission(「172.16.1.87」,
new Permissions(Permissions.READ, Permissions.WRITE), 「test」);
// grant permission WRITE to ip 172.16.1.87 on topic test
authzManager.grant(「172.16.1.87」, new Permissions(Permissions.CREATE), 「test」);
// revoke permission READ from ip 172.16.1.87 on topic test
authzManager.revoke(「172.16.1.87」, new Permissions(Permissions.READ), 「test」);
// commit the permission settings
authzManager.commit();
authzManager.close();
}
}
總結與展望
本文經過介紹Kafka現有架構,深刻挖掘其中存在的安全問題,並給出Transwarp在Kafka安全上所作的工做及其使用方式。然而,縱觀Hadoop & Spark生態系統,安全功能還存在不少問題,各組件的權限系統獨立混亂,缺乏集中易用的帳戶管理系統。某些組件的權限管理還很不成熟,如Spark的調度器缺乏用戶的概念,不能限制具體用戶使用資源的多少。Transwarp基於開源版本,在安全方面已有至關多的積累,並持續改進開發,致力於爲企業用戶提供一個易用、高效、安全和穩定的基礎數據平臺。