最近在github上看了soul網關的設計,忽然就來了興趣準備本身從零開始寫一個高性能的網關。通過兩週時間的開發,個人網關ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差沒有管理後臺😤。html
網關是全部請求的入口,因此要求有很高的吞吐量,爲了實現這點可使用請求異步化來解決。目前通常有如下兩種方案:前端
Servlet3已經支持異步,這種方案使用比較多,京東,有贊和Zuul,都用的是這種方案。java
Netty爲高併發而生,目前惟品會的網關使用這個策略,在惟品會的技術文章中在相同的狀況下Netty是每秒30w+的吞吐量,Tomcat是13w+,能夠看出是有必定的差距的,可是Netty須要本身處理HTTP協議,這一塊比較麻煩。react
後面發現Soul網關是基於Spring WebFlux(底層Netty)的,不用太關心HTTP協議的處理,因而決定也用Spring WebFlux。git
網關的第二個特色是具有可擴展性,好比Netflix Zuul有preFilters,postFilters等在不一樣的階段方便處理不一樣的業務,基於責任鏈模式將請求進行鏈式處理便可實現。github
在微服務架構下,服務都會進行多實例部署來保證高可用,請求到達網關時,網關須要根據URL找到全部可用的實例,這時就須要服務註冊和發現功能,即註冊中心。web
如今流行的註冊中心有Apache的Zookeeper和阿里的Nacos兩種(consul有點小衆),由於以前寫RPC框架時已經用過了Zookeeper,因此此次就選擇了Nacos。算法
首先要明確目標,即開發一個具有哪些特性的網關,總結下後以下:spring
自定義路由規則數據庫
可基於version的路由規則設置,路由對象包括DEFAUL,HEADER和QUERY三種,匹配方式包括=、regex、like三種。
跨語言
HTTP協議天生跨語言
高性能
Netty自己就是一款高性能的通訊框架,同時server將一些路由規則等數據緩存到JVM內存避免請求admin服務。
高可用
支持集羣模式防止單節點故障,無狀態。
灰度發佈
灰度發佈(又名金絲雀發佈)是指在黑與白之間,可以平滑過渡的一種發佈方式。在其上能夠進行A/B testing,即讓一部分用戶繼續用產品特性A,一部分用戶開始用產品特性B,若是用戶對B沒有什麼反對意見,那麼逐步擴大範圍,把全部用戶都遷移到B上面來。經過特性一能夠實現。
接口鑑權
基於責任鏈模式,用戶開發本身的鑑權插件便可。
負載均衡
支持多種負載均衡算法,如隨機,輪詢,加權輪詢等。利用SPI機制能夠根據配置進行動態加載。
在參考了一些優秀的網關Zuul,Spring Cloud Gateway,Soul後,將項目劃分爲如下幾個模塊。
名稱 | 描述 |
---|---|
ship-admin | 後臺管理界面,配置路由規則等 |
ship-server | 網關服務端,核心功能模塊 |
ship-client-spring-boot-starter | 網關客戶端,自動註冊服務信息到註冊中心 |
ship-common | 一些公共的代碼,如pojo,常量等。 |
它們之間的關係如圖:
注意:這張圖與實際實現有點出入,Nacos push到本地緩存的那個環節沒有實現,目前只有ship-sever定時輪詢pull的過程。ship-admin從Nacos獲取註冊服務信息的過程,也改爲了ServiceA啓動時主動發生HTTP請求通知ship-admin。
首先建立一個spring-boot-starter命名爲ship-client-spring-boot-starter,不知道如何自定義starter的能夠看我之前寫的《開發本身的starter》。
其核心類 AutoRegisterListener 就是在項目啓動時作了兩件事:
1.將服務信息註冊到Nacos註冊中心
2.通知ship-admin服務上線了並註冊下線hook。
代碼以下:
/** * Created by 2YSP on 2020/12/21 */ public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> { private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class); private volatile AtomicBoolean registered = new AtomicBoolean(false); private final ClientConfigProperties properties; @NacosInjected private NamingService namingService; @Autowired private RequestMappingHandlerMapping handlerMapping; private final ExecutorService pool; /** * url list to ignore */ private static List<String> ignoreUrlList = new LinkedList<>(); static { ignoreUrlList.add("/error"); } public AutoRegisterListener(ClientConfigProperties properties) { if (!check(properties)) { LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!"); throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!"); } this.properties = properties; pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); } /** * check the ClientConfigProperties * * @param properties * @return */ private boolean check(ClientConfigProperties properties) { if (properties.getPort() == null || properties.getContextPath() == null || properties.getVersion() == null || properties.getAppName() == null || properties.getAdminUrl() == null) { return false; } return true; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (!registered.compareAndSet(false, true)) { return; } doRegister(); registerShutDownHook(); } /** * send unregister request to admin when jvm shutdown */ private void registerShutDownHook() { final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH; final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO(); unregisterAppDTO.setAppName(properties.getAppName()); unregisterAppDTO.setVersion(properties.getVersion()); unregisterAppDTO.setIp(IpUtil.getLocalIpAddress()); unregisterAppDTO.setPort(properties.getPort()); Runtime.getRuntime().addShutdownHook(new Thread(() -> { OkhttpTool.doPost(url, unregisterAppDTO); LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion()); })); } /** * register all interface info to register center */ private void doRegister() { Instance instance = new Instance(); instance.setIp(IpUtil.getLocalIpAddress()); instance.setPort(properties.getPort()); instance.setEphemeral(true); Map<String, String> metadataMap = new HashMap<>(); metadataMap.put("version", properties.getVersion()); metadataMap.put("appName", properties.getAppName()); instance.setMetadata(metadataMap); try { namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance); } catch (NacosException e) { LOGGER.error("register to nacos fail", e); throw new ShipException(e.getErrCode(), e.getErrMsg()); } LOGGER.info("register interface info to nacos success!"); // send register request to ship-admin String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH; RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance); OkhttpTool.doPost(url, registerAppDTO); LOGGER.info("register to ship-admin success!"); } private RegisterAppDTO buildRegisterAppDTO(Instance instance) { RegisterAppDTO registerAppDTO = new RegisterAppDTO(); registerAppDTO.setAppName(properties.getAppName()); registerAppDTO.setContextPath(properties.getContextPath()); registerAppDTO.setIp(instance.getIp()); registerAppDTO.setPort(instance.getPort()); registerAppDTO.setVersion(properties.getVersion()); return registerAppDTO; } }
ship-sever項目主要包括了兩個部份內容,
1.請求動態路由的主流程
2.本地緩存數據和ship-admin及nacos同步,這部分在後面3.3再講。
ship-server實現動態路由的原理是利用WebFilter攔截請求,而後將請求教給plugin chain去鏈式處理。
PluginFilter根據URL解析出appName,而後將啓用的plugin組裝成plugin chain。
public class PluginFilter implements WebFilter { private ServerConfigProperties properties; public PluginFilter(ServerConfigProperties properties) { this.properties = properties; } @Override public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { String appName = parseAppName(exchange); if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) { throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); } PluginChain pluginChain = new PluginChain(properties, appName); pluginChain.addPlugin(new DynamicRoutePlugin(properties)); pluginChain.addPlugin(new AuthPlugin(properties)); return pluginChain.execute(exchange, pluginChain); } private String parseAppName(ServerWebExchange exchange) { RequestPath path = exchange.getRequest().getPath(); String appName = path.value().split("/")[1]; return appName; } }
PluginChain繼承了AbstractShipPlugin並持有全部要執行的插件。
/** * @Author: Ship * @Description: * @Date: Created in 2020/12/25 */ public class PluginChain extends AbstractShipPlugin { /** * the pos point to current plugin */ private int pos; /** * the plugins of chain */ private List<ShipPlugin> plugins; private final String appName; public PluginChain(ServerConfigProperties properties, String appName) { super(properties); this.appName = appName; } /** * add enabled plugin to chain * * @param shipPlugin */ public void addPlugin(ShipPlugin shipPlugin) { if (plugins == null) { plugins = new ArrayList<>(); } if (!PluginCache.isEnabled(appName, shipPlugin.name())) { return; } plugins.add(shipPlugin); // order by the plugin's order plugins.sort(Comparator.comparing(ShipPlugin::order)); } @Override public Integer order() { return null; } @Override public String name() { return null; } @Override public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) { if (pos == plugins.size()) { return exchange.getResponse().setComplete(); } return pluginChain.plugins.get(pos++).execute(exchange, pluginChain); } public String getAppName() { return appName; } }
AbstractShipPlugin實現了ShipPlugin接口,並持有ServerConfigProperties配置對象。
public abstract class AbstractShipPlugin implements ShipPlugin { protected ServerConfigProperties properties; public AbstractShipPlugin(ServerConfigProperties properties) { this.properties = properties; } }
ShipPlugin接口定義了全部插件必須實現的三個方法order(),name()和execute()。
public interface ShipPlugin { /** * lower values have higher priority * * @return */ Integer order(); /** * return current plugin name * * @return */ String name(); Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain); }
DynamicRoutePlugin繼承了抽象類AbstractShipPlugin,包含了動態路由的主要業務邏輯。
/** * @Author: Ship * @Description: * @Date: Created in 2020/12/25 */ public class DynamicRoutePlugin extends AbstractShipPlugin { private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class); private static WebClient webClient; private static final Gson gson = new GsonBuilder().create(); static { HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(3)) .addHandlerLast(new WriteTimeoutHandler(3))) .option(ChannelOption.TCP_NODELAY, true) ); webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient)) .build(); } public DynamicRoutePlugin(ServerConfigProperties properties) { super(properties); } @Override public Integer order() { return ShipPluginEnum.DYNAMIC_ROUTE.getOrder(); } @Override public String name() { return ShipPluginEnum.DYNAMIC_ROUTE.getName(); } @Override public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) { String appName = pluginChain.getAppName(); ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest()); // LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance)); // request service String url = buildUrl(exchange, serviceInstance); return forward(exchange, url); } /** * forward request to backend service * * @param exchange * @param url * @return */ private Mono<Void> forward(ServerWebExchange exchange, String url) { ServerHttpRequest request = exchange.getRequest(); ServerHttpResponse response = exchange.getResponse(); HttpMethod method = request.getMethod(); WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> { headers.addAll(request.getHeaders()); }); WebClient.RequestHeadersSpec<?> reqHeadersSpec; if (requireHttpBody(method)) { reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody())); } else { reqHeadersSpec = requestBodySpec; } // nio->callback->nio return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis())) .onErrorResume(ex -> { return Mono.defer(() -> { String errorResultJson = ""; if (ex instanceof TimeoutException) { errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}"; } else { errorResultJson = "{\"code\":5000,\"message\":\"system error\"}"; } return ShipResponseUtil.doResponse(exchange, errorResultJson); }).then(Mono.empty()); }).flatMap(backendResponse -> { response.setStatusCode(backendResponse.statusCode()); response.getHeaders().putAll(backendResponse.headers().asHttpHeaders()); return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class)); }); } /** * weather the http method need http body * * @param method * @return */ private boolean requireHttpBody(HttpMethod method) { if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) { return true; } return false; } private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) { ServerHttpRequest request = exchange.getRequest(); String query = request.getURI().getQuery(); String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), ""); String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path; if (!StringUtils.isEmpty(query)) { url = url + "?" + query; } return url; } /** * choose an ServiceInstance according to route rule config and load balancing algorithm * * @param appName * @param request * @return */ private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) { List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName); if (CollectionUtils.isEmpty(serviceInstances)) { LOGGER.error("service instance of {} not find", appName); throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND); } String version = matchAppVersion(appName, request); if (StringUtils.isEmpty(version)) { throw new ShipException("match app version error"); } // filter serviceInstances by version List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList()); //Select an instance based on the load balancing algorithm LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version); ServiceInstance serviceInstance = loadBalance.chooseOne(instances); return serviceInstance; } private String matchAppVersion(String appName, ServerHttpRequest request) { List<AppRuleDTO> rules = RouteRuleCache.getRules(appName); rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed()); for (AppRuleDTO rule : rules) { if (match(rule, request)) { return rule.getVersion(); } } return null; } private boolean match(AppRuleDTO rule, ServerHttpRequest request) { String matchObject = rule.getMatchObject(); String matchKey = rule.getMatchKey(); String matchRule = rule.getMatchRule(); Byte matchMethod = rule.getMatchMethod(); if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) { return true; } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) { String param = request.getQueryParams().getFirst(matchKey); if (!StringUtils.isEmpty(param)) { return StringTools.match(param, matchMethod, matchRule); } } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) { HttpHeaders headers = request.getHeaders(); String headerValue = headers.getFirst(matchKey); if (!StringUtils.isEmpty(headerValue)) { return StringTools.match(headerValue, matchMethod, matchRule); } } return false; } }
app數據同步
後臺服務(如訂單服務)啓動時,只將服務名,版本,ip地址和端口號註冊到了Nacos,並無實例的權重和啓用的插件信息怎麼辦?
通常在線的實例權重和插件列表都是在管理界面配置,而後動態生效的,因此須要ship-admin定時更新實例的權重和插件信息到註冊中心。
對應代碼ship-admin的NacosSyncListener
/** * @Author: Ship * @Description: * @Date: Created in 2020/12/30 */ @Configuration public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> { private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class); private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory("nacos-sync", true).create()); @NacosInjected private NamingService namingService; @Value("${nacos.discovery.server-addr}") private String baseUrl; @Resource private AppService appService; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() != null) { return; } String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH; scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS); } class NacosSyncTask implements Runnable { private NamingService namingService; private String url; private AppService appService; private Gson gson = new GsonBuilder().create(); public NacosSyncTask(NamingService namingService, String url, AppService appService) { this.namingService = namingService; this.url = url; this.appService = appService; } /** * Regular update weight,enabled plugins to nacos instance */ @Override public void run() { try { // get all app names ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME); if (CollectionUtils.isEmpty(services.getData())) { return; } List<String> appNames = services.getData(); List<AppInfoDTO> appInfos = appService.getAppInfos(appNames); for (AppInfoDTO appInfo : appInfos) { if (CollectionUtils.isEmpty(appInfo.getInstances())) { continue; } for (ServiceInstance instance : appInfo.getInstances()) { Map<String, Object> queryMap = buildQueryMap(appInfo, instance); String resp = OkhttpTool.doPut(url, queryMap, ""); LOGGER.debug("response :{}", resp); } } } catch (Exception e) { LOGGER.error("nacos sync task error", e); } } private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) { Map<String, Object> map = new HashMap<>(); map.put("serviceName", appInfo.getAppName()); map.put("groupName", NacosConstants.APP_GROUP_NAME); map.put("ip", instance.getIp()); map.put("port", instance.getPort()); map.put("weight", instance.getWeight().doubleValue()); NacosMetadata metadata = new NacosMetadata(); metadata.setAppName(appInfo.getAppName()); metadata.setVersion(instance.getVersion()); metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins())); map.put("metadata", StringTools.urlEncode(gson.toJson(metadata))); map.put("ephemeral", true); return map; } } }
ship-server再定時從Nacos拉取app數據更新到本地Map緩存。
/** * @Author: Ship * @Description: sync data to local cache * @Date: Created in 2020/12/25 */ @Configuration public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> { private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory("service-sync", true).create()); @NacosInjected private NamingService namingService; @Autowired private ServerConfigProperties properties; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() != null) { return; } scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService) , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS); WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort()); websocketSyncCacheServer.start(); } class DataSyncTask implements Runnable { private NamingService namingService; public DataSyncTask(NamingService namingService) { this.namingService = namingService; } @Override public void run() { try { // get all app names ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME); if (CollectionUtils.isEmpty(services.getData())) { return; } List<String> appNames = services.getData(); // get all instances for (String appName : appNames) { List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME); if (CollectionUtils.isEmpty(instanceList)) { continue; } ServiceCache.add(appName, buildServiceInstances(instanceList)); List<String> pluginNames = getEnabledPlugins(instanceList); PluginCache.add(appName, pluginNames); } ServiceCache.removeExpired(appNames); PluginCache.removeExpired(appNames); } catch (NacosException e) { e.printStackTrace(); } } private List<String> getEnabledPlugins(List<Instance> instanceList) { Instance instance = instanceList.get(0); Map<String, String> metadata = instance.getMetadata(); // plugins: DynamicRoute,Auth String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName()); return Arrays.stream(plugins.split(",")).collect(Collectors.toList()); } private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) { List<ServiceInstance> list = new LinkedList<>(); instanceList.forEach(instance -> { Map<String, String> metadata = instance.getMetadata(); ServiceInstance serviceInstance = new ServiceInstance(); serviceInstance.setAppName(metadata.get("appName")); serviceInstance.setIp(instance.getIp()); serviceInstance.setPort(instance.getPort()); serviceInstance.setVersion(metadata.get("version")); serviceInstance.setWeight((int) instance.getWeight()); list.add(serviceInstance); }); return list; } } }
路由規則數據同步
同時,若是用戶在管理後臺更新了路由規則,ship-admin須要推送規則數據到ship-server,這裏參考了soul網關的作法利用websocket在第一次創建鏈接後進行全量同步,此後路由規則發生變動就只做增量同步。
服務端WebsocketSyncCacheServer:
/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */ public class WebsocketSyncCacheServer extends WebSocketServer { private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class); private Gson gson = new GsonBuilder().create(); private MessageHandler messageHandler; public WebsocketSyncCacheServer(Integer port) { super(new InetSocketAddress(port)); this.messageHandler = new MessageHandler(); } @Override public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) { LOGGER.info("server is open"); } @Override public void onClose(WebSocket webSocket, int i, String s, boolean b) { LOGGER.info("websocket server close..."); } @Override public void onMessage(WebSocket webSocket, String message) { LOGGER.info("websocket server receive message:\n[{}]", message); this.messageHandler.handler(message); } @Override public void onError(WebSocket webSocket, Exception e) { } @Override public void onStart() { LOGGER.info("websocket server start..."); } class MessageHandler { public void handler(String message) { RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class); if (CollectionUtils.isEmpty(operationDTO.getRuleList())) { return; } Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList() .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName)); if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType()) || OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) { RouteRuleCache.add(map); } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) { RouteRuleCache.remove(map); } } } }
客戶端WebsocketSyncCacheClient:
/** * @Author: Ship * @Description: * @Date: Created in 2020/12/28 */ @Component public class WebsocketSyncCacheClient { private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class); private WebSocketClient client; private RuleService ruleService; private Gson gson = new GsonBuilder().create(); public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl, RuleService ruleService) { if (StringUtils.isEmpty(serverWebSocketUrl)) { throw new ShipException(ShipExceptionEnum.CONFIG_ERROR); } this.ruleService = ruleService; ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new ShipThreadFactory("websocket-connect", true).create()); try { client = new WebSocketClient(new URI(serverWebSocketUrl)) { @Override public void onOpen(ServerHandshake serverHandshake) { LOGGER.info("client is open"); List<AppRuleDTO> list = ruleService.getEnabledRule(); String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list)); send(msg); } @Override public void onMessage(String s) { } @Override public void onClose(int i, String s, boolean b) { } @Override public void onError(Exception e) { LOGGER.error("websocket client error", e); } }; client.connectBlocking(); //使用調度線程池進行斷線重連,30秒進行一次 executor.scheduleAtFixedRate(() -> { if (client != null && client.isClosed()) { try { client.reconnectBlocking(); } catch (InterruptedException e) { LOGGER.error("reconnect server fail", e); } } }, 10, 30, TimeUnit.SECONDS); } catch (Exception e) { LOGGER.error("websocket sync cache exception", e); throw new ShipException(e.getMessage()); } } public <T> void send(T t) { while (!client.getReadyState().equals(ReadyState.OPEN)) { LOGGER.debug("connecting ...please wait"); } client.send(gson.toJson(t)); } }
本地啓動nacos ,sh startup.sh -m standalone
啓動ship-admin
本地啓動兩個ship-example實例。
實例1配置:
ship: http: app-name: order version: gray_1.0 context-path: /order port: 8081 admin-url: 127.0.0.1:9001 server: port: 8081 nacos: discovery: server-addr: 127.0.0.1:8848
實例2配置:
ship: http: app-name: order version: prod_1.0 context-path: /order port: 8082 admin-url: 127.0.0.1:9001 server: port: 8082 nacos: discovery: server-addr: 127.0.0.1:8848
在數據庫添加路由規則配置,該規則表示當http header 中的name=ship時請求路由到gray_1.0版本的節點。
啓動ship-server,看到如下日誌時則能夠進行測試了。
2021-01-02 19:57:09.159 INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer : websocket server receive message: [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
用Postman請求http://localhost:9000/order/user/add,POST方式,header設置name=ship,能夠看到只有實例1有日誌顯示。
==========add user,version:gray_1.0
壓測環境:
MacBook Pro 13英寸
處理器 2.3 GHz 四核Intel Core i7
內存 16 GB 3733 MHz LPDDR4X
後端節點個數一個
壓測工具:wrk
壓測結果:20個線程,500個鏈接數,吞吐量大概每秒9400個請求。
千里之行始於足下,開始覺得寫一個網關會很難,但當你實際開始行動時就會發現其實沒那麼難,因此邁出第一步很重要。過程當中也遇到了不少問題,還在github上給soul和nacos這兩個開源項目提了兩個issue,後來發現是本身的問題,尷尬😅。本文代碼已所有上傳到github,點擊這裏便可,最後,但願此文對你有所幫助。
參考資料:
https://nacos.io/zh-cn/docs/quick-start.html