在多個內網系統之上,增長一個網關服務,統一對第三方應用進行鑑權與認證,方可對內部資源服務進行訪問,網關服務主要起到鑑權認證,請求轉發主要藉助Servlet3.0的異步特性實現,結合springboot進行開發。web
同步請求會將整個請求鏈路的發起,解析,響應在一個同步邏輯中進行。spring
採用異步處化能夠將請求中耗時操做交給線程池作異步處理,在高併發場景下,經過調用一個非web服務線程處理耗時邏輯,提升系統併發性。apache
因爲線程池是隔離的,能夠對線程池作業務隔離分組,進行請求分級,監控等。json
以前有幾篇文章介紹了認證和鑑權的實現思路,可參考系統鑑權流程及簽名生成規則,公網API安全--OAuth認證,互聯網通用架構技術----公網API安全規範。api
轉發的思路主要但願能夠將客戶端請求直接轉發到業務系統,網關係統對於請求api,經過識別入參的條件進行不一樣業務系統的路由,請求api不作干擾直接轉發。安全
經過業務線程池接收請求,將任務提交到線程池。springboot
@RequestMapping("/book") public void getBook( HttpServletRequest request, @RequestParam(value="skuId") final Long skuId, @RequestParam(value="cat1") final Integer cat1, @RequestParam(value="cat2") final Integer cat2) throws Exception { oneLevelAsyncContext.submitFuture(request, () -> bookService.getBook(skuId, cat1, cat2)); }
業務線程池封裝。架構
public void submitFuture( final HttpServletRequest req, final Callable<Object> task) { final String uri = req.getRequestURI(); final Map<String, String[]> params = req.getParameterMap(); final AsyncContext asyncContext = req.startAsync(); //開啓異步上下文 asyncContext.getRequest().setAttribute("uri", uri); asyncContext.getRequest().setAttribute("params", params); asyncContext.setTimeout(asyncTimeoutInSeconds * 1000); if(asyncListener != null) { asyncContext.addListener(asyncListener); } executor.submit(new CanceledCallable(asyncContext) { //提交任務給業務線程池 @Override public Object call() throws Exception { Object o = task.call(); //業務處理調用 if(o == null) { callBack(asyncContext, o, uri, params); //業務完成後,響應處理 } if(o instanceof CompletableFuture) { CompletableFuture<Object> future = (CompletableFuture<Object>)o; future.thenAccept(resultObject -> callBack(asyncContext, resultObject, uri, params)) .exceptionally(e -> { callBack(asyncContext, "", uri, params); return null; }); } else if(o instanceof String) { callBack(asyncContext, o, uri, params); } return null; } }); } private void callBack( AsyncContext asyncContext, Object result, String uri, Map<String, String[]> params) { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); try { if(result instanceof String) { write(resp, (String)result); } else { write(resp, JSONUtils.toJSON(result)); } } catch (Throwable e) { resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序內部錯誤 try { LOG.error("get info error, uri : {}, params : {}", uri, JSONUtils.toJSON(params), e); } catch (Exception ex) { } } finally { asyncContext.complete(); } }
線程池初始化。併發
@Override public void afterPropertiesSet() throws Exception { String[] poolSizes = poolSize.split("-"); //初始線程池大小 int corePoolSize = Integer.valueOf(poolSizes[0]); //最大線程池大小 int maximumPoolSize = Integer.valueOf(poolSizes[1]); queue = new LinkedBlockingDeque<Runnable>(queueCapacity); executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTimeInSeconds, TimeUnit.SECONDS, queue); executor.allowCoreThreadTimeOut(true); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if(r instanceof CanceledCallable) { CanceledCallable cc = ((CanceledCallable) r); AsyncContext asyncContext = cc.asyncContext; if(asyncContext != null) { try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request rejected, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } } } }); if(asyncListener == null) { asyncListener = new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { } @Override public void onTimeout(AsyncEvent event) throws IOException { AsyncContext asyncContext = event.getAsyncContext(); try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request timeout, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } @Override public void onError(AsyncEvent event) throws IOException { AsyncContext asyncContext = event.getAsyncContext(); try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request error, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } @Override public void onStartAsync(AsyncEvent event) throws IOException { } }; } }
/** * Description * * @author Mr. Chun. */ @WebListener public class AppContextListener implements ServletContextListener { /** * 經過ContextListener進行線程池初始化 * * @param servletContextEvent */ @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 100, 200, 50000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100)); servletContextEvent.getServletContext().setAttribute("executor", executor); } /** * 經過ContextListener進行線程池銷燬 * @param servletContextEvent */ @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { ThreadPoolExecutor executor = (ThreadPoolExecutor) servletContextEvent.getServletContext().getAttribute("executor"); executor.shutdown(); } }
/** * Description * ... * @author Mr. Chun. */ @WebServlet(urlPatterns = "/qbs/route", asyncSupported = true) public class AsyncLongRunningServlet extends HttpServlet { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(AsyncLongRunningServlet.class); @Autowired private RestTemplate restTemplate; @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { doGet(req, resp); } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { logger.info("==== 進入Servlet的時間:" + new Date() + " ===="); long startTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId()); req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); //在子線程中執行業務調用,並由其負責輸出響應,主線程退出 AsyncContext ctx = req.startAsync(); ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onComplete"); } @Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onTimeout"); ServletResponse response = asyncEvent.getAsyncContext().getResponse(); response.setCharacterEncoding("UTF-8"); response.setContentType(MediaType.APPLICATION_JSON.toString()); PrintWriter out = null; try { out = response.getWriter(); out.print(ResponseBuilder.buildJsonString("請求超時")); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } } } @Override public void onError(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onError"); ServletResponse response = asyncEvent.getAsyncContext().getResponse(); response.setCharacterEncoding("UTF-8"); response.setContentType(MediaType.APPLICATION_JSON.toString()); PrintWriter out = null; try { out = response.getWriter(); out.print(ResponseBuilder.buildJsonString("請求異常")); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } } } @Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onStartAsync"); } }); ctx.setTimeout(9000); ThreadPoolExecutor executor = (ThreadPoolExecutor) req.getServletContext().getAttribute("executor"); executor.execute(new AsyncRequestProcessor(restTemplate, ctx, req.getMethod(), req.getParameter("api"))); // 任務提交線程池 long endTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms."); logger.info("==== 結束Servlet的時間:" + new Date() + " ===="); } }
/** * Description * ... * * @author Mr. Chun. */ public class AsyncRequestProcessor implements Runnable { private static final Logger logger = LoggerFactory.getLogger(AsyncRequestProcessor.class); private String url = "http://localhost:8080/"; private RestTemplate restTemplate; private AsyncContext ctx = null; private String requestMethod = ""; public AsyncRequestProcessor(RestTemplate restTemplate, AsyncContext ctx, String requestMethod, String api) { this.restTemplate = restTemplate; this.ctx = ctx; this.requestMethod = requestMethod; url = url + api; } // 業務請求轉發在這裏處理 public void run() { try { long startTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId()); String url = this.api.replace("/qbs/api/", ""); String key = url; String param = ""; if (url.contains("/")) { key = url.substring(0, url.indexOf("/")); param = url.substring(url.indexOf("/"), url.length()); } this.api = routeService.getRoute(key) + param; if (!StringUtils.isEmpty(this.api)) { String json = ""; logger.info("======"); // 請求入參 MultiValueMap<String, String> paramMap = ResponseBuilder.getUsefulParam(ctx.getRequest().getParameterMap()); String requestMethod = request.getMethod(); String contentType = request.getContentType(); if ("GET".equals(requestMethod)) { // GET 請求 if (paramMap.size() > 0) { api = ResponseBuilder.buildGetUrl(api, paramMap); } logger.info("PARAM url: {} param: {}", api, paramMap); json = restTemplate.getForObject(api, String.class, paramMap); } else if ("POST".equals(requestMethod)) { // POST 請求 logger.info("PARAM url: {} param: {}", api, paramMap); HttpHeaders headers = new HttpHeaders(); headers.setContentType(contentType.equals("application/json") ? MediaType.APPLICATION_JSON : MediaType.APPLICATION_FORM_URLENCODED); if (contentType.equals("application/json")) { // json 提交 StringBuffer sb = new StringBuffer(""); String temp; BufferedReader br = new BufferedReader(new InputStreamReader(request.getInputStream(), "utf-8")); while ((temp = br.readLine()) != null) { sb.append(temp); } br.close(); String body = sb.toString(); HttpEntity<String> formEntity = new HttpEntity<>(body, headers); json = restTemplate.postForObject(api, formEntity, String.class); } else { // form 表單提交 HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(paramMap, headers); ResponseEntity<String> response = restTemplate.postForEntity(api, request, String.class); json = response.getBody(); } } logger.info("======"); logger.info("RESULT json: {}", json); ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), json); } else { logger.info("key: {}", key); ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), ResponseBuilder.buildJsonString(400, "key無效,key: " + key)); } ctx.complete(); // 通知容器,異步處理完成 logger.info("======"); long endTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms."); } catch (Exception e) { logger.error("AsyncExecutor e: " + e.getMessage()); } } }
能夠發如今請求進入以後將業務放到線程池中異步執行,請求退出,業務處理完成以後進行響應,轉發和響應異步化。 異步化以後吞吐量提高了,可是響應時間長了,也就是異步化並不會提高響應時間,可是會增長吞吐量和增長咱們須要的靈活性。app