「持續更新中,歡迎關注...」數據庫
做爲一家專一於三維高精度地圖服務的公司,內部有海量(PB級)的原始數據、中間數據、成功數據,須要存儲、管理、並按期歸檔。json
)實現。主要實現功能:項目展現、上傳、下載。網絡
log_by_lua_file.lua:從Openresty獲取文件信息,併發往Kafka架構
local cjson = require "cjson" local producer = require "resty.kafka.producer" local broker_list = { { host = "172.16.0.20", port = 9092 }, } function send_job_to_kafka() local log_json = {} local req_headers_ = ngx.req.get_headers() for k, v in pairs(req_headers_) do if k == "content-length" then log_json["contentLength"] = tostring(v) end if k == "u-id" then log_json["uId"] = tostring(v) end if k == "p-id" then log_json["pId"] = tostring(v) end end local resp_headers_ = ngx.resp.get_headers() for k, v in pairs(resp_headers_) do if k == "etag" then log_json["etag"] = string.gsub(v, "\"", "") break end end log_json["uri"] = ngx.var.uri log_json["host"] = ngx.var.host log_json["remoteAddr"] = ngx.var.remote_addr log_json["status"] = ngx.var.status local message = cjson.encode(log_json); ngx.log(ngx.ERR, "message is[", message, "]") return message end --local is_args = ngx.var.is_args local request_method = ngx.var.request_method local status_code = ngx.var.status -- 過濾Put Object成功的請求,記錄相應的metadata及請求ID,並轉發到kafka if request_method == "PUT" and status_code == "200" then local bp = producer:new(broker_list, { producer_type = "async" }) local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka()) if not ok then ngx.log(ngx.ERR, "kafka send err:", err) return end ngx.log(ngx.ERR, "kafka send success:", ok) end