一、Kafka 安裝html
官網下載 kafka_2.11-1.1.1.tgz,解壓後,修改kafka的配置文件:config/server.propertiesgit
broker.id=0 # 在集羣內必須惟一github
advertised.host.name=192.168.10.100 # 配置對外IP地址,不然連接不上kafkajson
log.dirs=/data/kafka-logs # 配置kafka的存儲目錄,包含kafka的日誌和寫入kafka的文件api
zookeeper.connect=zk.test.com:2181 # 配置zookeeper的地址異步
二、啓動kafka服務async
nohup sh bin/kafka-server-start.sh config/server.properties > /data/kafka-logs/server.log 2>&1 &測試
啓動kafka服務,並將服務端日誌寫入 server.log 文件ui
三、建立topiclua
bin/kafka-topics.sh --zookeeper zk.test.com:2181 --create --topic test1 --partitions 1 --replication-factor 1
建立名稱爲 test1 的topic
四、kafka測試
bin/kafka-console-producer.sh --broker-list zk.test.com:9092 --topic test1
建立一個消息生產者,輸入消息則會發送到kafka
bin/kafka-console-consumer.sh --zookeeper zk.test.com:2181 --topic test1 --from-beginning
建立一個消費者,從topic頭部開始消費信息
五、配置 lua-resty-kafka
lua-resty-kafka 下載地址:https://github.com/doujiang24/lua-resty-kafka
因爲需用到lua解析json,則還須要下載lua-cjson:https://github.com/openresty/lua-cjson/
將相應的配置導入到OpenResty的lua配置中
六、配置 OpenResty
server {
listen 8088;
location / {
default_type text/html;
content_by_lua '
-- 引入lua全部api
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
-- 定義kafka broker地址
local broker_list = {
{ host = "192.168.10.100", port = 9092 },
}
-- 定義json便於日誌數據整理收集
local log_json = {}
log_json["uri"]=ngx.var.uri
log_json["args"]=ngx.var.args
log_json["host"]=ngx.var.host
log_json["request_body"]=ngx.var.request_body
log_json["remote_addr"] = ngx.var.remote_addr
log_json["remote_user"] = ngx.var.remote_user
log_json["time_local"] = ngx.var.time_local
log_json["status"] = ngx.var.status
log_json["body_bytes_sent"] = ngx.var.body_bytes_sent
log_json["http_referer"] = ngx.var.http_referer
log_json["http_user_agent"] = ngx.var.http_user_agent
log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for
log_json["upstream_response_time"] = ngx.var.upstream_response_time
log_json["request_time"] = ngx.var.request_time
-- 轉換json爲字符串
local message = cjson.encode(log_json);
-- 定義kafka異步生產者
local bp = producer:new(broker_list, { producer_type = "async" })
-- 發送日誌消息,send第二個參數key,用於kafka路由控制:
-- key爲nill(空)時,一段時間向同一partition寫入數據
-- 指定key,按照key的hash寫入到對應的partition
local ok, err = bp:send("test1", nil, message)
ngx.say("<br>", message)
ngx.say("<br>kafka result:", ok)
ngx.say("<br>kafka error:", err)
';
}
}
七、測試
訪問 localhost:8088,則會將訪問的日誌寫入kafka
八、kafka的一些命令
基礎的命令都在kafka/bin 的目錄下
kafka-server-start.sh 服務啓動命令
kafka-topics.sh topic的命令,能夠查看topic的狀況,包括名稱、分區等信息
kafka-console-producer.sh 生產者命令
kafka-console-consumer.sh 消費者名稱