OpenResty + Lua + Kafka 實現日誌收集系統

一、Kafka 安裝html

官網下載 kafka_2.11-1.1.1.tgz,解壓後,修改kafka的配置文件:config/server.propertiesgit

broker.id=0  # 在集羣內必須惟一github

advertised.host.name=192.168.10.100  # 配置對外IP地址,不然連接不上kafkajson

log.dirs=/data/kafka-logs  # 配置kafka的存儲目錄,包含kafka的日誌和寫入kafka的文件api

zookeeper.connect=zk.test.com:2181  # 配置zookeeper的地址異步

二、啓動kafka服務async

nohup sh bin/kafka-server-start.sh config/server.properties > /data/kafka-logs/server.log 2>&1 &測試

啓動kafka服務,並將服務端日誌寫入 server.log 文件ui

三、建立topiclua

bin/kafka-topics.sh --zookeeper zk.test.com:2181 --create --topic test1 --partitions 1 --replication-factor 1

建立名稱爲 test1 的topic

四、kafka測試

bin/kafka-console-producer.sh --broker-list zk.test.com:9092 --topic test1

建立一個消息生產者,輸入消息則會發送到kafka

bin/kafka-console-consumer.sh --zookeeper zk.test.com:2181 --topic test1 --from-beginning

建立一個消費者,從topic頭部開始消費信息

五、配置 lua-resty-kafka 

lua-resty-kafka 下載地址:https://github.com/doujiang24/lua-resty-kafka

因爲需用到lua解析json,則還須要下載lua-cjson:https://github.com/openresty/lua-cjson/

將相應的配置導入到OpenResty的lua配置中

六、配置 OpenResty

server {
        listen    8088;
        location / {
            default_type text/html;
            content_by_lua '
                -- 引入lua全部api  
                local cjson = require "cjson"  
                local producer = require "resty.kafka.producer"  
                -- 定義kafka broker地址  
                local broker_list = {  
                    { host = "192.168.10.100", port = 9092 },  
                }  
                -- 定義json便於日誌數據整理收集  
                local log_json = {}  
                log_json["uri"]=ngx.var.uri  
                log_json["args"]=ngx.var.args  
                log_json["host"]=ngx.var.host  
                log_json["request_body"]=ngx.var.request_body  
                log_json["remote_addr"] = ngx.var.remote_addr  
                log_json["remote_user"] = ngx.var.remote_user  
                log_json["time_local"] = ngx.var.time_local  
                log_json["status"] = ngx.var.status  
                log_json["body_bytes_sent"] = ngx.var.body_bytes_sent  
                log_json["http_referer"] = ngx.var.http_referer  
                log_json["http_user_agent"] = ngx.var.http_user_agent  
                log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for  
                log_json["upstream_response_time"] = ngx.var.upstream_response_time  
                log_json["request_time"] = ngx.var.request_time  
                -- 轉換json爲字符串  
                local message = cjson.encode(log_json);  
                -- 定義kafka異步生產者  
                local bp = producer:new(broker_list, { producer_type = "async" })  
                -- 發送日誌消息,send第二個參數key,用於kafka路由控制:  
                -- key爲nill(空)時,一段時間向同一partition寫入數據  
                -- 指定key,按照key的hash寫入到對應的partition  
                local ok, err = bp:send("test1", nil, message)  
   
                ngx.say("<br>", message)
                ngx.say("<br>kafka result:", ok)
                ngx.say("<br>kafka error:", err)
            ';
        }
    }

七、測試

訪問 localhost:8088,則會將訪問的日誌寫入kafka

八、kafka的一些命令

基礎的命令都在kafka/bin 的目錄下

kafka-server-start.sh 服務啓動命令

kafka-topics.sh topic的命令,能夠查看topic的狀況,包括名稱、分區等信息

kafka-console-producer.sh 生產者命令

kafka-console-consumer.sh 消費者名稱

相關文章
相關標籤/搜索