AbtestingGateway 複製請求到其餘服務上

perface

小道一句:本文爲原創文章,某SDN博主抄襲個人文章不帶出處的。html

最近有需求,須要把服務a發給服務b的請求複製給服務c,服務a發給服務b的時候會通過nginx,這個nginx是有lua腳原本輔助工做的。說白了,這個nginx+lua就是abtestingGateway。
架構圖以下:
nginx

下面看看怎麼實現abtestingGateway來複制流量。redis

分流條件

customercode和user字段的數據都在請求裏的json數據裏面。json

  1. 匹配到customercode等於咱們指定的值後把流量分到服務B。
  2. 在條件1的基礎上,判斷是否有users這個字段,有的話服務b,服務C同時發,不然只發服務B。
  3. 若是沒有匹配到customercode,那麼就轉發到服務C上。

分流策略添加

這個請參考咱們的另外一篇博文 abtestingGateway 分流策略添加架構

編寫複製請求的代碼

我在abtestingGateway下,在customercode.lua(lib/abtesting/diversion目錄下)文件裏面添加代碼函數

邏輯是這樣的:
當abtestingGateway拿到customercode後,從redis裏面拿取對應的upstream,若是upstream是服務b的upstream,那麼就須要複製流量咯,反之。
因此咱們在getUpstream裏添加一段代碼post

_M.getUpstream = function(self, customercode)
    if not tostring(customercode) then
        return nil
    end
    
    local database, key = self.database, self.policyLib
    
    local backend, err = database:hget(key, customercode)
    if not backend then error{ERRORINFO.REDIS_ERROR, err} end
    
    if backend == ngx.null then backend = nil end
    
    -- 下面就是新添加的代碼 begin
    if backend == "newacm" then   -- newacm就是咱們配置的upstream名字
        copyRequest()
    end     
    -- end

    local new_acm_uri = ngx.var.new_acm_uri
    if new_acm_uri and backend then
        backend = backend..new_acm_uri
    end   
    return backend
end

匹配到指定的upstream後,那麼就走copyRequest的方法了,代碼以下:測試

_M.copyRequestToOldAcm = function(self)
    if  ngx.var.copy_switch == "on" then -- 分流開關,on爲打開
        copyRequest()
    end
end

function copyRequest() -- 匹配到咱們指定的customerCode之後,那麼就複製一份請求發送到老的ACM上,緣由是由於發送給老的acm就可新老一塊同步數據了
        users_exist = false  -- 有這個users那麼這個標誌位爲true,沒有的話就是false
        action = ngx.var.request_method

        local postData = ngx.req.get_post_args()  --若是post請求裏面沒有這個customercode參數,
        if postData then  -- post 請求
            local errinfo =  ERRORINFO.UNKNOWN_ERROR
            if postData then
                for k, v in pairs(postData) do  --這個k爲未json反序列化的數據,v爲布爾值
                    local after_json_k = cjson.decode(k)  --提交上來的數據爲json格式的,須要反序列化一下
                    if after_json_k then
                        for k1 ,v1 in pairs(after_json_k) do
                            if k1 == "customer" then
                               if v1.users then
                                   users_exist = true
                               end
                            end
                        end
                    end
                end
            end
        end

        if users_exist == true then   --沒有匹配到users在post請求數據裏,那麼就不須要複製一份請求到老的的acm上了,直接return就好了。不然須要複製
            log:info("the request has users arguments ,so  needn't copy the request to the old acm!")
            return false
        else
            log:info("the request doesn't have users arguments, so need to copy the request to  the old acm!")           
        end

        if action == "POST" then
                --ngx.req.read_body() -- 解析 body 參數以前必定要先讀取 body
                local arg = ngx.req.get_post_args() -- post須要參數傳遞
                arry = {method = ngx.HTTP_POST, body = ngx.var.request_body, args=arg}
        else
                arry = {method = ngx.HTTP_GET}
        end

        oldacm = ngx.location.capture_multi {
            { "/oldacm" .. ngx.var.request_uri, arry},
        }
        if oldacm.status == ngx.HTTP_OK then -- 只返回online server的結果
            -- ngx.say(online.body)
            log:info("copy request result is ok!!")
            return true    
        else
            -- ngx.status = ngx.HTTP_NOT_FOUND
            --for i,r in pairs(oldacm) do   end
            log:info("copy request result is failed!! the response body is -->"..oldacm.body)
            return false
        end     
                                                
end

而後咱們在diversion/diversion.lua裏面添加下這段代碼,添加的這段代碼起到這個做用的,abtestingGateway在運行的時候,只要匹配到了咱們的規則,好比customercode=123456,匹配到了customercode=123456,那麼abtestingGateway會從redis獲取這個規則對應的upstream,而後把這個upstream放在nginx的內存裏面,在接下來的60秒之內,只要匹配到規則就直接從內存裏面拿取,不走getUpstream這個函數了,因此也觸發不了複製請求的代碼塊了。這就是爲何咱們還要添加下面這些代碼:ui

95 local copyRequest = function()
 96     local cm =    require("abtesting.diversion.customercode")
 97     local cr   = cm:copyRequestToOldAcm()
 98     return cr
 99 end
264 local info = "get upstream ["..ups.."] according to ["..idx.."] userinfo ["..usertable[idx].."] in cache 1"
266 log:info(info)
267 if string.find(ups,"newacm") == 1 then
268     copyRequest()
269 end

左邊的數字是代碼所處的行數,能夠參考行數來添加代碼。lua

最後咱們須要在nginx的配置文件裏面添加oldacm這個location,否則會提示404咯

location /oldacm/ {
        proxy_pass http://stable/;  # 服務B ip
    }

而後重啓nginx便可咯。測試,沒啥問題。

相關文章
相關標籤/搜索