在咱們部署完kafka以後,雖然咱們已經能夠「肆意」的用kafka了,可是在一個大公司的實際生產環境中,kafka集羣每每十分龐大,每一個使用者都應該只關心本身所負責的Topic,而且對其餘人所使用的Topic沒有權限。這樣一來能夠將資源隔離開來,二來能夠防止誤操做。java
在權限控制以前,咱們必需要啓用的就是用戶認證,沒有用戶,天然沒有權限一說了。vim
新建kfaka用戶跨域 |
kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'dom |
生成keytaboop |
kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"測試 |
注意:ui
一、若是以前zookeeper沒有啓用kerberos,這裏也要啓用zookeeper的kerberosspa
二、若是以前在CM中啓用了kerberos,咱們能夠直接從CM中獲取keytab,可是注意keytab必定要保持最新的,不然認證不會經過,keytab的位置是:命令行
/var/run/cloudera-scm-agent/process/****-kafka-KAFKA_BROKER/kafka.keytabserver
/var/run/cloudera-scm-agent/process/****-zookeeper-server/zookeeper.keytab
//修改這一句 listeners=SASL_PLAINTEXT://host.name:port //新增如下 authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka super.users=User:kafka |
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="kafka" keyTab="/etc/keytab/kafka_122.keytab" //替換爲本身的keytab所在位置 principal="kafka/{hostname}@{REALM}";//替換爲本身的keytab所對應principle };
// Zookeeper client authentication,由於卡夫卡使用過程當中會和zookeeper進行交互 Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="zookeeper" keyTab="/etc/keytab/kafka_122.keytab" //替換爲本身的keytab所在位置 principal="kafka/{hostname}@{REALM}";//替換爲本身的keytab所對應principle }; |
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/root/kafka/config/kafka_server.jaas" //剛纔的kafka_server.jaas位置 |
bin/kafka-server-stop.sh bin/kafka-server-start.sh |
在broker啓用kerberos以後,若是咱們後續須要在命令行界面進行操做,及consumer與producer操做,咱們須要在這些客戶端也配置上kerberos
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/keytab/kafka_122.keytab" serviceName="kafka" principal="kafka/{hostname}@{REALM}"; };
// Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true serviceName="zookeeper" keyTab="/etc/keytab/kafka_122.keytab" principal="kafka/{hostname}@{REALM}"; };
|
當前會話生效: export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_client.jaas" |
配置到環境變量中 vim /etc/profile 增長 export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/kafka/config/kafka_client.jaas" |
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka |
Kafka認證管理CLI(和其餘的CLI腳本)能夠在bin目錄中找到。CLI腳本名是kafka-acls.sh。啓用以前,須要在server.properties裏添加這句:
allow.everyone.if.no.acl.found=false |
添加互信principle |
kadmin.local下操做 addprinc krbtgt/{REALMA}@{REALMB} addprinc krbtgt/{REALMB}@{REALMA} |
修改krb5.conf |
[realms]//realms 裏配上兩個域的信息 HADOOP.SPADE.COM = { kdc = hb21-bd-cm-130-61:88 admin_server = hb21-bd-cm-130-61:749 } HADOOP.TEST.COM = { kdc = tk-dba-hadoop-152:88 admin_server = tk-dba-hadoop-152:749 } [domain_realm] //domain_realm 配上域名和主機名的映射,有多少機器就要配多少 tk-dba-hadoop-154 = HADOOP.TEST.COM hb21-dba-kfk-130-120 = HADOOP.SPADE.COM
[capaths] //capaths 配上互信的域的映射 HADOOP.SAPDE.COM ={ HADOOP.TEST.COM = . } HADOOP.TEST.COM={ HADOOP.SPADE.COM = . }
|
添加sasl.kerberos.principal.to.local.rules屬性
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@\HADOOP.TEST.COM$)s/@\HADOOP.TEST.COM$//,RULE:[2:$1@$0](.*@\HADOOP.TEST.COM$)s/@\HADOOP.TEST.COM$//,RULE:[1:$1@$0](.*@\HADOOP.SPADE.COM$)s/@\HADOOP.SPADE.COM$//,RULE:[2:$1@$0](.*@\HADOOP.SPADE.COM$)s/@\HADOOP.SPADE.COM$//,DEFAULT |
從域B中複製出keytab到域A的機器中,而後在A中使用該keytab,配置jaas文件,導入環境變量中。用該keytab操做集羣A或者集羣B中的topic,能正常寫入數據即爲成功。
生產環境啓用kerberos以後,爲了給業務向的consumer和producer一個平滑的接入認證系統的緩衝時間,這段時間咱們能夠給kafka啓用兩個監聽端口,一個是須要kerberos認證的端口,一個不須要認證的端口。讓他們共同存在,同時服務。
修改server.properties |
listeners=SASL_PLAINTEXT://10.21.130.120:9092,PLAINTEXT://10.21.130.120:9093 allow.everyone.if.no.acl.found=false |
bin/kafka-acls.sh --add --authorizer-properties zookeeper.connect={host:port/childpath} --allow-principal User:ANONYMOUS --allow-host * --operation All --topic {topicname} |
刪除jaas環境變量 |
unset {變量名} |
producer測試 |
bin/kafka-console-producer.sh --broker-list {host}:9093 --topic{topicname} |
kafka的bin目錄下的zookeeper-security-migration.sh,能夠將kafka的權限,遍歷賦給zookeeper中每一個子節點,而後分別設置acl,由於zookeeper的acl是僅對當前節點生效,對其下節點不生效的,單獨賦權限很麻煩。zookeeper-security-migration.sh解決了這個問題。
修改server.properties,增長 zookeeper.set.acl=true |
重啓kafka集羣(批量重啓或滾動重啓) |
啓動zookeeper-security-migration.sh腳本,secure設置同步,unsecure取消同步 bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connect={host}:{port}/{path} |