個人上一篇博客介紹了什麼是灰度發佈 灰度發佈html
本文將介紹分佈式框架如何作到灰度發佈。前端
在介紹具體實現步驟前,讓咱們先看下分佈式框架下實現灰度發佈的實現結構圖:java
express1 ; express2 => ip-express
左邊爲匹配規則,以分號區分多個匹配規則,須要所有知足,才能匹配成功git
method match "getFoo" ,"setFoo"
每個子表達式形式如上,能夠經過 逗號(,)匹配多個條件,這裏條件只要知足其一便可=> 是 Then表達式,劃分左邊和右邊,若是左邊匹配成功,將指向右邊的 ip 表達式github
ip"192.168.1.12"
表示若是匹配成功,請求將路由到192.168.1.12的ip的服務節點上ip支持掩碼的匹配形式。
如 ip"192.168.1.0/24" 能夠路由到 「192.168.1.0」的一個範圍。- method match "getSkuById" => ip"192.168.12.12"
做用:將方法爲getSkuById的請求路由到web
能夠經過正則的形式進行匹配,以下,能夠將以get開頭的請求路由到12的機器上,將set開頭的請求路由到13的機器上。spring
method match r"get.*" => ip"192.168.12.12" method match r"set.*" => ip"192.168.12.13"
- calleeIp match ip'192.168.1.101' => ip"192.168.2.105/30"
表示,請求ip爲'192.168.1.101'的請求 將會 路由到 192.168.2.105/30及其掩碼的ip的服務實例中express
- calleeIp match ip'192.168.1.101' => ip"0.0.0.0"
表示將請求爲101的ip路由到無效的ip上,實現黑名單的功能json
- userId match 10..1000 => ip"192.168.12.1"
表示將請求用戶id爲10 到 1000 的用戶 路由到 ip爲192.168.12.1的服務實例api
- userId match %「1024n+6」 => ip"192.168.12.1"
表示將請求用戶id與1024取模結果爲6時,路由到 ip爲192.168.12.1的服務實例 userId match %「1024n+3..5」 => ip"192.168.12.1" 表示將請求用戶id與1024取模結果爲3到5之間時,路由到 ip爲192.168.12.1的服務實例
method match r"set.*" => ~ip"192.168.12.14"
表示以set開頭的方法將不會路由到 ip 爲 192.168.12.14 的 服務實例
otherwise => ip"192.168.12.12"
表示左側全部都匹配,通常做爲路由規則的最後一條執行,表示前面全部路由規則都不知足時,最後執行的路由規則
method match r"set.*" ; version match "1.0.0" => ip'192.168.1.103'
同時知足上述兩個條件的請求,纔會路由到右側Ip的實例上
method match r"set.*",r"insert.*" => ip"192.123.12.11"
這種情形是,當請求的方法名爲 set開頭 或者 insert開頭時均可以匹配成功,路由到右側Ip
serviceName match "com.today.service.MemberService" => ip"192.168.12.1",ip"192.168.12.2"
上述情形表示符合左邊的條件,能夠路由到上述右側兩個ip上
method match "setFoo" => ip"192.168.10.12/24" method match "getFoo" => ip"192.168.12.14" otherwise => ip"192.168.12.18"
上述情形爲多個路由表達式寫法,每一個路由表達式 換行分隔
咱們會從最上面一條路由表達式開始進行匹配,當匹配到時即中止,不在繼續向下匹配。 若是沒有匹配到,將繼續向下進行解析。 如上,當前兩條都不符合時,便可路由到第三條,otherwise表示全部都符合的規則,這樣最終將會路由到"192.168.12.18"的ip上
如下咱們以登陸用戶(即userId)進行灰度發佈來說實現步驟。
定義灰度發佈規則
userId match 19767 , 16852 , 16695 => ip"10.100.226.227" otherwise => ip"10.100.45.116"
以上規則代表:當登陸用戶ID爲19767或16852是,訪問IP地址爲10.100.226.227的服務器服務,不然其餘用戶都訪問IP地址爲10.100.45.116的服務器服務
/** * 根據host鏈接zk * * @param host * @return * @throws Exception */ public static ZooKeeper createZkByHost(String host) throws Exception { CountDownLatch semaphore = new CountDownLatch(1); ZooKeeper zkClient = null; try { /* * ZooKeeper客戶端和服務器會話的創建是一個異步的過程 * 構造函數在處理完客戶端的初始化工做後當即返回,在大多數狀況下,並無真正地創建好會話 * 當會話真正建立完畢後,Zookeeper服務器會向客戶端發送一個事件通知 */ zkClient = new ZooKeeper(host, 500, (event) -> { LOGGER.info("waiting 鏈接 Zk ...."); if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { semaphore.countDown(); } }); LOGGER.info("build zk connect state1[{}]...", zkClient.getState()); //semaphore.await(); semaphore.await(1000, TimeUnit.MILLISECONDS); LOGGER.info("build zk connect state2[{}]...", zkClient.getState()); LOGGER.info("build zk connect on [{}]...", host); } catch (Exception e) { LOGGER.info(e.getMessage(), e); } if (Objects.nonNull(zkClient) && zkClient.getState() == CONNECTED) { return zkClient; } else { if (zkClient != null) { zkClient.close(); } LOGGER.info("ZK build connect on [{}] failed ...", host); throw new Exception("ZK build connect on [" + host + "] failed ..."); } }
/** * 執行發佈(服務) * * @param cid * @throws Exception */ private void processPublish(String host, ConfigInfoDto cid) throws Exception { ZooKeeper zk = createZkByHost(host); String service = cid.getServiceName();
String routerConfig = "userId match 19767 , 16852 , 16695 => ip\"10.100.226.227\" \n" +
"otherwise => ip\"10.100.45.116\"";
// 路由
ZkUtil.createData(zk, "/soa/config/routes/" + service,routerConfig);
ZkUtil.closeZk(zk);
}
全部微服務接口提供統一網關
import com.github.dapeng.core.InvocationContext; import com.github.dapeng.core.InvocationContextImpl; import com.github.dapeng.core.SoaException; import com.github.dapeng.core.helper.DapengUtil; import com.github.dapeng.core.helper.SoaSystemEnvProperties; import com.github.dapeng.openapi.utils.PostUtil; import com.today.api.admin.enums.StaffManagerEnum; import com.today.domain.LoginUser; import com.today.domain.ResponseData; import com.today.enums.ResponseStatus; import com.today.soa.idgen.IDServiceClient; import com.today.soa.idgen.domain.GenIDRequest; import com.today.util.JSONUtil; import com.today.util.UserSessionHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.stream.Collectors; /** * @author ever * @date 2018-01-29 */ @RestController @RequestMapping("api") public class OpenApiController { private Logger logger = LoggerFactory.getLogger(OpenApiController.class); private IDServiceClient idServiceClient = new IDServiceClient(); private final static String BARCODE = "sku_barcode"; @PostMapping(value = "{service}/{version}/{method}") @ResponseBody public String rest(@PathVariable(value = "service") String service, @PathVariable(value = "version") String version, @PathVariable(value = "method") String method, @RequestParam(value = "parameter") String parameter, HttpServletRequest req) { try { InvocationContext invocationContext = InvocationContextImpl.Factory.currentInstance(); String sessionTid = invocationContext.sessionTid().map(DapengUtil::longToHexStr).orElse("0"); MDC.put(SoaSystemEnvProperties.KEY_LOGGER_SESSION_TID, sessionTid); //接口鑑權 LoginUser loginUser = UserSessionHelper.getCurrentLoginUser(); if(loginUser.getManager() == StaffManagerEnum.SUPPER_MANAGER){ //超級管理員 不鑑權 return PostUtil.post(service, version, method, parameter, req); }else{ String serviceCode = service.substring(service.lastIndexOf(".")+1); logger.info("檢測權限code"+serviceCode+"."+method + "===="+loginUser.getPermissionList().size()); boolean bool = loginUser.getPermissionList().stream(). filter(item -> (serviceCode+"."+method).equals(item.code)).collect(Collectors.toList()).isEmpty(); if(!bool){ return PostUtil.post(service, version, method, parameter, req); }else{ ResponseData responseData = new ResponseData(); responseData.setStatus(ResponseStatus.NO_PERMISSION); responseData.setResponseMsg("["+serviceCode+"."+method+"]權限不足"); return JSONUtil.toJson(responseData); } } } finally { MDC.remove(SoaSystemEnvProperties.KEY_LOGGER_SESSION_TID); } } }
其中PostUtil.post()會根據服務名,版本號,方法名及參數發起RPC請求對應微服務,這個方法會根據頁面傳入的參數獲取userId並設置到分佈式框架的上下文供路由匹配選擇
以下:
public static String post(String service, String version, String method, String parameter, HttpServletRequest req, boolean clearInvocationContext) { InvocationContextImpl invocationCtx = (InvocationContextImpl)createInvocationCtx(service, version, method, req); OptimizedService bizService = ServiceCache.getService(service, version); if (bizService == null) { LOGGER.error("bizService not found[service:" + service + ", version:" + version + "]"); return String.format("{\"responseCode\":\"%s\", \"responseMsg\":\"%s\", \"success\":\"%s\", \"status\":0}", SoaCode.NoMatchedService.getCode(), SoaCode.NoMatchedService.getMsg(), "{}"); } else { Set<String> parameters = req.getParameterMap().keySet(); if (parameters.contains("userId")) { invocationCtx.userId(Long.valueOf(req.getParameter("userId"))); } InvocationContextProxy invocationCtxProxy = Factory.getInvocationContextProxy(); invocationCtx.cookies(invocationCtxProxy.cookies()); JsonPost jsonPost = new JsonPost(service, version, method, true); String var10; try { String var9 = jsonPost.callServiceMethod(parameter, bizService); return var9; } catch (SoaException var15) { LOGGER.error(var15.getMsg(), var15); var10 = String.format("{\"responseCode\":\"%s\", \"responseMsg\":\"%s\", \"success\":\"%s\", \"status\":0}", var15.getCode(), var15.getMsg(), "{}"); } catch (Exception var16) { LOGGER.error(var16.getMessage(), var16); var10 = String.format("{\"responseCode\":\"%s\", \"responseMsg\":\"%s\", \"success\":\"%s\", \"status\":0}", "9999", "系統繁忙,請稍後再試[9999]!", "{}"); return var10; } finally { if (clearInvocationContext) { Factory.removeCurrentInstance(); } } return var10; } }
如今有個問題,爲了從HttpServletRequest中經過getParameter("userId")獲取當前登陸用戶ID,咱們若是前臺傳入,前端每一個調用接口都要傳入userId參數,這樣就會大大增長前端的工做量。因此決定在服務端在session中獲取當前用戶ID設置到HttpServletRequest。但問題是HttpServletRequest爲了防止頁面傳入參數被篡改,並無提供setParameter()方法,因此經過過濾器及繼承HttpServletRequestWrapper來實現。以下:
web.xml添加過濾器:
<filter> <filter-name>addUserIdFilter</filter-name> <filter-class>com.today.filter.AddUserIdFilter</filter-class> </filter> <filter-mapping> <filter-name>addUserIdFilter</filter-name> <url-pattern>/api/*</url-pattern> <dispatcher>REQUEST</dispatcher> <dispatcher>FORWARD</dispatcher> </filter-mapping>
實現AddUserIdFilter:
import com.today.domain.LoginUser; import com.today.util.UserSessionHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.*; import javax.servlet.http.HttpServletRequest; import java.io.IOException; /** * 類功能描述:添加用戶ID過濾鏈 * * @author WangXueXing create at 19-5-22 上午8:38 * @version 1.0.0 */ public class AddUserIdFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AddUserIdFilter.class); /** * 經過過濾器添加當前登陸用戶ID, 爲了後續經過用戶ID進行灰度 * @param request * @param response * @param chain * @throws IOException * @throws ServletException */ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { ModifyRequestParameterWrapper requestWrapper = new ModifyRequestParameterWrapper((HttpServletRequest)request); try{ LoginUser loginUser = UserSessionHelper.getCurrentLoginUser(); if(loginUser != null){ requestWrapper.addParameter("userId", loginUser.getStaffId()); } } catch (Exception e) { logger.error("添加userId報錯", e); } finally { chain.doFilter(requestWrapper, response); } } @Override public void destroy() {} @Override public void init(FilterConfig fConfig) {} }
實現ModifyRequestParameterWrapper:
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequestWrapper; import java.util.HashMap; import java.util.Map; /** * 類功能描述:HttpServletRequest修改參數 * * @author WangXueXing create at 19-5-22 上午8:26 * @version 1.0.0 */ public class ModifyRequestParameterWrapper extends HttpServletRequestWrapper { private Map<String, String[]> params = new HashMap<String, String[]>(); public ModifyRequestParameterWrapper(HttpServletRequest request) { // 將request交給父類,以便於調用對應方法的時候,將其輸出,其實父親類的實現方式和第一種new的方式相似 super(request); //將參數表,賦予給當前的Map以便於持有request中的參數 this.params.putAll(request.getParameterMap()); } //重載一個構造方法 public ModifyRequestParameterWrapper(HttpServletRequest request, Map<String, Object> extendParams) { this(request); addAllParameters(extendParams);//這裏將擴展參數寫入參數表 } @Override public String getParameter(String name) {//重寫getParameter,表明參數從當前類中的map獲取 String[] values = params.get(name); if (values == null || values.length == 0) { return null; } return values[0]; } public String[] getParameterValues(String name) {//同上 return params.get(name); } public void addAllParameters(Map<String, Object> otherParams) {//增長多個參數 for (Map.Entry<String, Object> entry : otherParams.entrySet()) { addParameter(entry.getKey(), entry.getValue()); } } public void addParameter(String name, Object value) {//增長參數 if (value != null) { if (value instanceof String[]) { params.put(name, (String[]) value); } else if (value instanceof String) { params.put(name, new String[]{(String) value}); } else { params.put(name, new String[]{String.valueOf(value)}); } } } }
以上就會在接口調用時將當前userId信息註冊到分佈式框架上下文。
請參考大鵬開源實現代碼:灰度路由規則定義
灰度路由規則定義請參考編譯原理詞法分析實現
灰度路由規則定義請參考編譯原理詞法分析實現
經過分佈式框架上下文獲取userId信息,及zookeeper中獲取到的定義的路由規則。經過3.中的匹配規則便可實現動態的路由選擇調用。
具體請參考:路由匹配
以下:List<RuntimeInstance> routedInstances = router(serviceInfo, checkVersionInstances);
private SoaConnection findConnection(final ZkServiceInfo serviceInfo, final String version, final String method) throws SoaException { InvocationContextImpl context = (InvocationContextImpl) InvocationContextImpl.Factory.currentInstance(); //設置慢服務檢測時間閾值 /*Optional<Long> maxProcessTime = getZkProcessTime(method, zkInfo); context.maxProcessTime(maxProcessTime.orElse(null));*/ // TODO: 2018-10-12 慢服務時間 取自超時時間[TimeOut] context.maxProcessTime(getTimeout(serviceInfo, method)); //若是設置了calleeip 和 calleport 直接調用服務 不走路由 if (context.calleeIp().isPresent() && context.calleePort().isPresent()) { return SubPoolFactory.getSubPool(IPUtils.transferIp(context.calleeIp().get()), context.calleePort().get()).getConnection(); } //當zk上服務節點發生變化的時候, 可能會致使拿到不存在的服務運行時實例或者根本拿不到任何實例. List<RuntimeInstance> compatibles = serviceInfo.runtimeInstances(); if (compatibles == null || compatibles.isEmpty()) { return null; } // checkVersion List<RuntimeInstance> checkVersionInstances = new ArrayList<>(8); for (RuntimeInstance rt : compatibles) { if (checkVersion(version, rt.version)) { checkVersionInstances.add(rt); } } if (checkVersionInstances.isEmpty()) { logger.error(getClass().getSimpleName() + "::findConnection[service: " + serviceInfo.serviceName() + ":" + version + "], not found available version of instances"); throw new SoaException(NoMatchedService, "服務 [ " + serviceInfo.serviceName() + ":" + version + "] 無可用實例:沒有找到對應的服務版本"); } // router // 把路由須要用到的條件放到InvocationContext中 capsuleContext(context, serviceInfo.serviceName(), version, method); List<RuntimeInstance> routedInstances = router(serviceInfo, checkVersionInstances); if (routedInstances == null || routedInstances.isEmpty()) { logger.error(getClass().getSimpleName() + "::findConnection[service: " + serviceInfo.serviceName() + "], not found available instances by routing rules"); throw new SoaException(NoMatchedRouting, "服務 [ " + serviceInfo.serviceName() + " ] 無可用實例:路由規則沒有解析到可運行的實例"); } //loadBalance RuntimeInstance inst = loadBalance(method, serviceInfo, routedInstances); if (inst == null) { // should not reach here throw new SoaException(NotFoundServer, "服務 [ " + serviceInfo.serviceName() + " ] 無可用實例:負載均衡沒有找到合適的運行實例"); } inst.increaseActiveCount(); // TODO: 2018-08-04 服務端須要返回來正確的版本號 context.versionName(inst.version); return SubPoolFactory.getSubPool(inst.ip, inst.port). getConnection(); }
/** * 執行 路由規則 匹配, 返回 通過路由後的 實例列表 */ public static List<RuntimeInstance> executeRoutes(InvocationContextImpl ctx, List<Route> routes, List<RuntimeInstance> instances) { if (logger.isDebugEnabled()) { StringBuilder logAppend = new StringBuilder(); instances.forEach(ins -> logAppend.append(ins.toString()).append(" ")); logger.debug(RoutesExecutor.class.getSimpleName() + "::executeRoutes開始過濾:過濾前 size {},實例: {}", instances.size(), logAppend.toString()); } boolean isMatched; for (Route route : routes) { try { isMatched = matchCondition(ctx, route.getLeft()); // 匹配成功,執行右邊邏輯 if (isMatched) { instances = matchThenRouteIp(instances, route); if (logger.isDebugEnabled()) { StringBuilder append = new StringBuilder(); instances.forEach(ins -> append.append(ins.toString()).append(" ")); logger.debug(RoutesExecutor.class.getSimpleName() + "::route left " + route.getLeft().toString() + "::executeRoutes過濾結果 size: {}, 實例: {}", instances.size(), append.toString()); } break; } else { if (logger.isDebugEnabled()) { logger.debug(RoutesExecutor.class.getSimpleName() + "::route left " + route.getLeft().toString() + "::executeRoutes路由沒有過濾, size {}", instances.size()); } } } catch (Throwable ex) { logger.error(ex.getMessage(), ex); } } return instances; }
/** * 是否匹配左邊 * * @param ctx * @param left * @return */ protected static boolean matchCondition(InvocationContextImpl ctx, Condition left) { if (left instanceof Otherwise) { return true; } Matchers matcherCondition = (Matchers) left; List<Matcher> matchers = matcherCondition.matchers; /** * left = matcher(;matcher)* * matcher = id match patterns * patterns = pattern(,pattern)* * matcher之間是與的關係 * pattern之間是或的關係 */ for (Matcher matcher : matchers) { String actuallyConditionValue = getValueFromInvocationCtx(ctx, matcher); List<Pattern> patterns = matcher.getPatterns(); boolean isMatch = false; for (Pattern pattern : patterns) { boolean result = matcherPattern(pattern, actuallyConditionValue); if (result) { isMatch = true; break; } } if (!isMatch) { return false; } } return true; }
/** * 路由規則的值和 ctx值 是否匹配 * * @param pattern * @param value * @return */ private static boolean matcherPattern(Pattern pattern, String value) { if (value == null || value.trim().equals("")) { return false; } if (pattern instanceof StringPattern) { String content = ((StringPattern) pattern).content; return content.equals(value); } else if (pattern instanceof NotPattern) { Pattern pattern1 = ((NotPattern) pattern).pattern; return !matcherPattern(pattern1, value); } else if (pattern instanceof IpPattern) { IpPattern ipPattern = ((IpPattern) pattern); return matchIpWithMask(ipPattern.ip, Integer.parseInt(value), ipPattern.mask); } else if (pattern instanceof RegexPattern) { /** * 使用緩存好的 pattern 進行 正則 匹配 */ java.util.regex.Pattern regex = ((RegexPattern) pattern).pattern; return regex.matcher(value).matches(); } else if (pattern instanceof RangePattern) { RangePattern range = ((RangePattern) pattern); long from = range.from; long to = range.to; long valueAsLong = Long.parseLong(value); return valueAsLong <= to && valueAsLong >= from; } else if (pattern instanceof ModePattern) { ModePattern mode = ((ModePattern) pattern); try { long valueAsLong = Long.valueOf(value); long result = valueAsLong % mode.base; Optional<Long> from = mode.from; long to = mode.to; if (from.isPresent()) { return result >= from.get() && result <= to; } else { return result == to; } } catch (NumberFormatException e) { logger.error("[ModePattern]::輸入參數 value 應爲數字類型的id ,but get {}", value); } catch (Exception e) { logger.error("[ModePattern]::throw exception:" + e.getMessage(), e); } return false; } else if (pattern instanceof NumberPattern) { try { NumberPattern number = ((NumberPattern) pattern); long valueAsLong = Long.parseLong(value); long numberLong = number.number; return valueAsLong == numberLong; } catch (Exception e) { logger.error("[NumberPattern]::throw exception:" + e.getMessage(), e); } return false; } return false; }