基於Openresty+CEPH實現海量數據管理系統

「持續更新中,歡迎關注...」數據庫

1. 需求:

做爲一家專一於三維高精度地圖服務的公司,內部有海量(PB級)的原始數據、中間數據、成功數據,須要存儲、管理、並按期歸檔。json

  1. 按項目管理數據,數據分類航飛數據、控制點數據、中間數據、成果數據、其餘數據。數據來源包括無人機數據、載荷數據、地面站數據、人工打點數據等。不一樣渠道聚集而來的數據。
  2. 採用相似百度網盤的形式,上傳、下載,支持斷點續傳、進度跟蹤。
  3. 支持細化到文件級別的權限控制,以及更多的文件(夾)屬性。

2. 分析:

  1. 系統重點在於數據存儲的選型,支持海量數據的存儲,可以支持在複雜網絡下的數據上傳。選用CEPH做爲數據存儲,RGW對象存儲,S3協議上傳下載,完美支持分片和斷點續傳。
  2. 系統難點在於文件級別的業務權限控制,以及文件(夾)更多的屬性支持。CEPH RGW自己支持權限控制,可是沒法和業務權限作對接。對象存儲自己沒有文件夾的概念,沒法對文件夾作分類、數量展現、大小展現。因此實現自定義索引服務,CEPH主要負責存儲,自定義索引服務實現展現與查詢。
  3. 因爲上傳下載會通過Openresty,經過lua腳本,將流經的文件信息,經由kafka轉發到業務服務中進行業務處理應用。

3. 實現

3.1 架構

空間數據系統架構圖.png

  1. 上傳助手就是類百度網盤的桌面端軟件,採用Electron JS

)實現。主要實現功能:項目展現、上傳、下載。網絡

  1. 業務層包括網關服務、帳號服務、項目服務、文件索引服務等。採用Java + Spring Boot + Spring Cloud技術棧。其中重點服務是文件索引服務Index Server,負責海量文件的索引維護和查詢。
  2. 業務數據MySQL集羣+Redis集羣,海量文件存儲使用CEPH對象存儲,支持S3 API。

3.3 關鍵流程圖

上傳流程.png

  1. 上傳助手使用普通的Put Object請求上傳文件,加上自定義的metadata字段(項目ID、用戶ID等)便可完成數據的提交。
  2. Openresty使用proxy模式將文件請求轉發到 CEPH RGW,由RGW完成後臺數據存儲處理。
  3. Openresty在RGW完成數據存儲之後,調用log_by_lua_file將對應請求的用戶自定義metadata和文件屬性轉發到後臺Kafka。
  4. 文件索引服務(Index Server)從Kafka中消費任務,拿到每一個文件的信息。
  5. 文件索引服務(Index Server)對文件數據按業務要求進行處理後,存入MySQL數據庫。

3.4 示例代碼

log_by_lua_file.lua:從Openresty獲取文件信息,併發往Kafka架構

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
    { host = "172.16.0.20", port = 9092 },
}
function send_job_to_kafka()
    local log_json = {}
    local req_headers_ = ngx.req.get_headers()

    for k, v in pairs(req_headers_) do
        if k == "content-length" then
            log_json["contentLength"] = tostring(v)
        end
        if k == "u-id" then
            log_json["uId"] = tostring(v)
        end
        if k == "p-id" then
            log_json["pId"] = tostring(v)
        end
    end

    local resp_headers_ = ngx.resp.get_headers()
    for k, v in pairs(resp_headers_) do
        if k == "etag" then
            log_json["etag"] = string.gsub(v, "\"", "")
            break
        end
    end

    log_json["uri"] = ngx.var.uri
    log_json["host"] = ngx.var.host
    log_json["remoteAddr"] = ngx.var.remote_addr
    log_json["status"] = ngx.var.status
    local message = cjson.encode(log_json);
    ngx.log(ngx.ERR, "message is[", message, "]")
    return message
end

--local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status

-- 過濾Put Object成功的請求,記錄相應的metadata及請求ID,並轉發到kafka
if request_method == "PUT" and status_code == "200" then
    local bp = producer:new(broker_list, { producer_type = "async" })
    local ok, err = bp:send("ceph_lua_test", nil, send_job_to_kafka())
    if not ok then
        ngx.log(ngx.ERR, "kafka send err:", err)
        return
    end
    ngx.log(ngx.ERR, "kafka send success:", ok)
end

4. 總結

  1. 經過此架構方案,在海量文件歸檔過程當中,將文件基本信息異步導入到業務數據庫中,便於業務應用開發。
  2. 此架構通常也應用對象存儲的多媒體文件處理,好比圖片處理、視頻處理、加水印、鑑黃、事件通知等。
相關文章
相關標籤/搜索