前些日子要封裝一個kafka的客戶端驅動,配置了下kafka環境,發現配置複雜度完爆rabbitmq不少倍啊,並且發佈訂閱模式使用起來也很麻煩,可能就勝在分佈式了吧。java
kafka須要java環境,自行安裝java sdk 1.8+.node
http://kafka.apache.org/downloadsapache
官方加載安裝包,當前爲kafka_2.11-1.1.0.tgzbootstrap
(新版kafka包內集成了zookeeper,直接啓動便可)服務器
解壓縮後kafka目錄下文件:jvm
配置:分佈式
config路徑下配置文件:ide
consumer.properites 消費者配置,這個配置文件用於配置開啓的消費者,默認便可ui
producer.properties 生產者配置,這個配置文件用於配置開啓的生產者,默認便可spa
server.properties kafka服務器的配置,此配置文件用來配置kafka服務器,核心配置以下:
(1)broker.id 申明當前kafka服務器在集羣中的惟一ID,需配置爲integer,而且集羣中的每個kafka服務器的id都應是惟一的,默認便可
(2)listeners 申明此kafka服務器須要監聽的端口號,若是是在本機上跑虛擬機運行能夠不用配置本項,默認會使用localhost的地址,若是是在遠程服務器上運行則必須配置,
例如:listeners=PLAINTEXT:// 192.168.1.1:9092。並確保服務器的9092端口可以訪問
(3)zookeeper.connect 申明kafka所鏈接的zookeeper的地址 ,需配置爲zookeeper的地址,因爲本次使用的是kafka高版本中自帶zookeeper,默認便可,
例如:zookeeper.connect=localhost:2181
啓動zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties & (&表明後臺運行)
啓動kafka:
bin/kafka-server-start.sh config/server.properties &
建立一個名爲test的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已經建立的topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
更改tioic分區:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --alter --partitions 4
查看指定topic信息:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
建立一個消息消費者:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
(若是彈出WARN [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available,需將上文中localhost改成隊列配置的地址)
建立一個消息生產者:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
出現 > 符號後輸入消息發送,消費者鏈接能夠看到輸出。
(若是彈出WARN [Consumer clientId=consumer-1, groupId=console-consumer-59948] Connection to node -1 could not be established. Broker may not be available,
需將上文中localhost改成隊列配置的地址)
Kafka安裝完成。
Kafka開啓使用 SASL_PLAINTEXT認證:
輸入下面命令,關閉kafka:
bin/kafka-server-stop.sh
輸入下面命令,關閉zookeeper:
bin/zookeeper-server-stop.sh
進入config目錄,增長以下配置文件:
cd config
(1)touch kafka_server_jaas.conf
配置以下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_alice="alice";
};
在KafkaServer部分,username和password是broker用於初始化鏈接到其餘的broker,在上面配置中,admin用戶爲broker間的通信,
user_userName定義了全部鏈接到 broker和 broker驗證的全部的客戶端鏈接包括其餘 broker的用戶密碼,user_userName必須配置admin用戶,不然報錯。
(2)touch kafka_cilent_jaas.conf
配置以下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
在KafkaClient部分,username和password是客戶端用來配置客戶端鏈接broker的用戶,在上面配置中,客戶端使用admin用戶鏈接到broker。
更改server.properties配置文件:
listeners=SASL_PLAINTEXT://ip:9092
# 使用的認證協議
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL機制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份驗證的類
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 若是沒有找到ACL(訪問控制列表)配置,則容許任何操做。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
修改consuer和producer的配置文件consumer.properties和producer.properties,分別增長以下配置:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
切換到kafka目錄下bin路徑:
cd ..
cd bin
JAAS文件做爲每一個broker的jvm參數,在kafka-server-start.sh腳本中增長以下配置(可在最上面):
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_server_jaas.conf"
在kafka-console-consumer.sh和kafka-console-producer.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_client_jaas.conf"
啓動zookeeper和kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties & (&表明後臺運行)
bin/kafka-server-start.sh config/server.properties &
輸入命令,啓動生產者:
bin/kafka-console-producer.sh --broker-list 10.100.17.79:9092 --topic test --producer.config config/producer.properties
輸入命令,啓動消費者
bin/kafka-console-consumer.sh --bootstrap-server 10.100.17.79:9092 --topic test --from-beginning --consumer.config config/consumer.properties
若是消費者啓動不了或者沒法消費指定topic,嘗試設置所使用用戶的組權限(當前使用用戶admin爲超級用戶,不須要配置權限):
利用kafka-acls.sh爲topic設置ACL:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:admin --group test-consumer-group --topic test
注意,若是admin要做爲消費端鏈接alice-topic的話,必須對其使用的group(test-consumer-group)也賦權(group 在consumer.properties中有默認配置group.id)
權限操做例子:
增長權限:
# 爲用戶 alice 在 test(topic)上添加讀寫的權限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:alice --operation Read --operation Write --topic test
# 對於 topic 爲 test 的消息隊列,拒絕來自 ip 爲192.168.1.100帳戶爲 zhangsan 進行 read 操做,其餘用戶都容許
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan
--deny-host 192.168.1.100 --operation Read --topic test
# 爲 zhangsan 和 alice 添加all,以容許來自 ip 爲192.168.1.100或者192.168.1.101的讀寫請求
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test
獲取權限列表:
# 列出 topic 爲 test 的全部權限帳戶
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --list --topic test
移除權限:
# 移除 acl
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=ip:2181 --remove --allow-principal User:zhangsan --allow-principal User:Alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic test
以上完成Kafka的SASL_PLAINTEXT認證。
多節點zookeeper下認證:
增長配置文件:
touch kafka_zoo_jaas.conf
配置以下:
ZKServer{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
};
配置應用名稱爲ZKServer,zookeeper默認使用的JAAS應用名稱是Server(或者zookeeper)。
設置的環境變量其實只是傳入到JVM的參數。這裏設置的環境變量是KAFKA_OPTS。修改zookeeper的啓動腳本zookeeper-server-start.sh以下:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf -Dzookeeper.sasl.serverconfig=ZKServer"
若是配置使用默認名稱,則只須要添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/data/kafka/kafka_2.11-1.1.0/config/kafka_zoo_jaas.conf「」
修改zookeeper.properties配置文件,增長配置:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
以上配置完成後可啓動嘗試多節點zookeeper身份認證