基於Servlet3.0異步特性實現請求鑑權與轉發

項目背景

在多個內網系統之上,增長一個網關服務,統一對第三方應用進行鑑權與認證,方可對內部資源服務進行訪問,網關服務主要起到鑑權認證,請求轉發主要藉助Servlet3.0的異步特性實現,結合springboot進行開發。web

將請求異步化的好處

同步請求會將整個請求鏈路的發起,解析,響應在一個同步邏輯中進行。spring

採用異步處化能夠將請求中耗時操做交給線程池作異步處理,在高併發場景下,經過調用一個非web服務線程處理耗時邏輯,提升系統併發性。apache

因爲線程池是隔離的,能夠對線程池作業務隔離分組,進行請求分級,監控等。json

思路

以前有幾篇文章介紹了認證和鑑權的實現思路,可參考系統鑑權流程及簽名生成規則公網API安全--OAuth認證互聯網通用架構技術----公網API安全規範api

轉發的思路主要但願能夠將客戶端請求直接轉發到業務系統,網關係統對於請求api,經過識別入參的條件進行不一樣業務系統的路由,請求api不作干擾直接轉發。安全

舉例

經過業務線程池接收請求,將任務提交到線程池。springboot

@RequestMapping("/book")  
public void getBook(
HttpServletRequest request, 
@RequestParam(value="skuId") final Long skuId,
@RequestParam(value="cat1") final Integer cat1, 
@RequestParam(value="cat2") final Integer cat2) throws Exception {  
  
    oneLevelAsyncContext.submitFuture(request, () -> bookService.getBook(skuId, cat1, cat2));  
}

業務線程池封裝。架構

public void submitFuture(
final HttpServletRequest req, 
final Callable<Object> task) {  
    final String uri = req.getRequestURI();  
    final Map<String, String[]> params = req.getParameterMap();  

    final AsyncContext asyncContext = req.startAsync();  //開啓異步上下文  
    asyncContext.getRequest().setAttribute("uri", uri);  
    asyncContext.getRequest().setAttribute("params", params);  
    asyncContext.setTimeout(asyncTimeoutInSeconds * 1000);  

    if(asyncListener != null) {  
        asyncContext.addListener(asyncListener);  
    }  

    executor.submit(new CanceledCallable(asyncContext) { //提交任務給業務線程池  
        @Override  
        public Object call() throws Exception {  
            Object o = task.call();  //業務處理調用  
            if(o == null) {  
                callBack(asyncContext, o, uri, params);  //業務完成後,響應處理  
            }  
            if(o instanceof CompletableFuture) {  
                CompletableFuture<Object> future = (CompletableFuture<Object>)o;  
                future.thenAccept(resultObject -> callBack(asyncContext, resultObject, uri, params))  
                .exceptionally(e -> {  
                    callBack(asyncContext, "", uri, params);  
                    return null;  
                });  
            } else if(o instanceof String) {  
                callBack(asyncContext, o, uri, params);  
            }  
            return null;  
        }  
    });  
}  

private void callBack(
AsyncContext asyncContext, 
Object result, String uri, 
Map<String, String[]> params) {  
    HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();  
    try {  
        if(result instanceof String) {  
            write(resp, (String)result);  
        } else {  
            write(resp, JSONUtils.toJSON(result));  
        }  
    } catch (Throwable e) {  
        resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序內部錯誤  
        try {  
            LOG.error("get info error, uri : {},  params : {}", uri, JSONUtils.toJSON(params), e);  
        } catch (Exception ex) {  
        }  
    } finally {  
        asyncContext.complete();  
    }  
}

線程池初始化。併發

@Override  
public void afterPropertiesSet() throws Exception {  
    String[] poolSizes = poolSize.split("-");  
    //初始線程池大小  
    int corePoolSize = Integer.valueOf(poolSizes[0]);  
    //最大線程池大小  
    int maximumPoolSize = Integer.valueOf(poolSizes[1]);  
    queue = new LinkedBlockingDeque<Runnable>(queueCapacity);  
    executor = new ThreadPoolExecutor(  
            corePoolSize, maximumPoolSize,  
            keepAliveTimeInSeconds, TimeUnit.SECONDS,  
            queue);  
  
    executor.allowCoreThreadTimeOut(true);  
    executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {  
        @Override  
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
            if(r instanceof CanceledCallable) {  
                CanceledCallable cc = ((CanceledCallable) r);  
                AsyncContext asyncContext = cc.asyncContext;  
                if(asyncContext != null) {  
                    try {  
                        String uri = (String) asyncContext.getRequest().getAttribute("uri");  
                        Map params = (Map) asyncContext.getRequest().getAttribute("params");  
                        LOG.error("async request rejected, uri : {}, params : {}", uri, JSONUtils.toJSON(params));  
                    } catch (Exception e) {}  
                    try {  
                        HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();  
                        resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);  
                    } finally {  
                        asyncContext.complete();  
                    }  
                }  
            }  
        }  
    });  
  
    if(asyncListener == null) {  
        asyncListener = new AsyncListener() {  
            @Override  
            public void onComplete(AsyncEvent event) throws IOException {  
            }  
  
            @Override  
            public void onTimeout(AsyncEvent event) throws IOException {  
                AsyncContext asyncContext = event.getAsyncContext();  
                try {  
                    String uri = (String) asyncContext.getRequest().getAttribute("uri");  
                    Map params = (Map) asyncContext.getRequest().getAttribute("params");  
                    LOG.error("async request timeout, uri : {}, params : {}", uri, JSONUtils.toJSON(params));  
                } catch (Exception e) {}  
                try {  
                    HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();  
                    resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);  
                } finally {  
                    asyncContext.complete();  
                }  
            }  
  
            @Override  
            public void onError(AsyncEvent event) throws IOException {  
                AsyncContext asyncContext = event.getAsyncContext();  
                try {  
                    String uri = (String) asyncContext.getRequest().getAttribute("uri");  
                    Map params = (Map) asyncContext.getRequest().getAttribute("params");  
                    LOG.error("async request error, uri : {}, params : {}", uri, JSONUtils.toJSON(params));  
                } catch (Exception e) {}  
                try {  
                    HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse();  
                    resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);  
                } finally {  
                    asyncContext.complete();  
                }  
            }  
  
            @Override  
            public void onStartAsync(AsyncEvent event) throws IOException {  
  
            }  
        };  
    }  
  
}

基於SpringBoot實現

  1. 在@SrpingBootApplication之上增長@EnableAsync註解。
  2. 若是項目中有自定義Filter,須要增長asyncSupported=true,@WebFilter(asyncSupported = true)。
  3. 經過ContextListener對Context進行監聽,context初始化時進行線程池建立,context銷燬時進行線程池銷燬。
/**
 * Description
 *
 * @author Mr. Chun.
 */
@WebListener
public class AppContextListener implements ServletContextListener {

    /**
     * 經過ContextListener進行線程池初始化
     *
     * @param servletContextEvent
     */
    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                100,
                200,
                50000L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(100));

        servletContextEvent.getServletContext().setAttribute("executor", executor);
    }

    /**
     * 經過ContextListener進行線程池銷燬
     * @param servletContextEvent
     */
    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) servletContextEvent.getServletContext().getAttribute("executor");
        executor.shutdown();
    }
}
  1. 建立自定義Servlet,增長asyncSupported=true,@WebServlet(urlPatterns = "/qbs/route", asyncSupported = true)
/**
 * Description
 * ...
 * @author Mr. Chun.
 */
@WebServlet(urlPatterns = "/qbs/route", asyncSupported = true)
public class AsyncLongRunningServlet extends HttpServlet {

    private static final long serialVersionUID = 1L;

    private static final Logger logger = LoggerFactory.getLogger(AsyncLongRunningServlet.class);

    @Autowired
    private RestTemplate restTemplate;

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        doGet(req, resp);
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        logger.info("==== 進入Servlet的時間:" + new Date() + " ====");

        long startTime = System.currentTimeMillis();
        logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId());
        req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

//在子線程中執行業務調用,並由其負責輸出響應,主線程退出
        AsyncContext ctx = req.startAsync();
        ctx.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent asyncEvent) throws IOException {
                System.out.println("AppAsyncListener onComplete");
            }

            @Override
            public void onTimeout(AsyncEvent asyncEvent) throws IOException {
                System.out.println("AppAsyncListener onTimeout");
                ServletResponse response = asyncEvent.getAsyncContext().getResponse();
                response.setCharacterEncoding("UTF-8");
                response.setContentType(MediaType.APPLICATION_JSON.toString());

                PrintWriter out = null;
                try {
                    out = response.getWriter();
                    out.print(ResponseBuilder.buildJsonString("請求超時"));
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (out != null) {
                        out.close();
                    }
                }
            }

            @Override
            public void onError(AsyncEvent asyncEvent) throws IOException {
                System.out.println("AppAsyncListener onError");
                ServletResponse response = asyncEvent.getAsyncContext().getResponse();
                response.setCharacterEncoding("UTF-8");
                response.setContentType(MediaType.APPLICATION_JSON.toString());

                PrintWriter out = null;
                try {
                    out = response.getWriter();
                    out.print(ResponseBuilder.buildJsonString("請求異常"));
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (out != null) {
                        out.close();
                    }
                }
            }

            @Override
            public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
                System.out.println("AppAsyncListener onStartAsync");
            }
        });
        ctx.setTimeout(9000);

        ThreadPoolExecutor executor = (ThreadPoolExecutor) req.getServletContext().getAttribute("executor");
        executor.execute(new AsyncRequestProcessor(restTemplate, ctx, req.getMethod(), req.getParameter("api"))); // 任務提交線程池

        long endTime = System.currentTimeMillis();
        logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms.");

        logger.info("==== 結束Servlet的時間:" + new Date() + " ====");
    }
}
  1. 將耗時任務交由獨立線程進行處理,經過實現Runable的run()方法實現。
/**
 * Description
 * ...
 *
 * @author Mr. Chun.
 */
public class AsyncRequestProcessor implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(AsyncRequestProcessor.class);

    private String url = "http://localhost:8080/";


    private RestTemplate restTemplate;
    private AsyncContext ctx = null;
    private String requestMethod = "";

    public AsyncRequestProcessor(RestTemplate restTemplate, AsyncContext ctx, String requestMethod, String api) {
        this.restTemplate = restTemplate;
        this.ctx = ctx;
        this.requestMethod = requestMethod;
        url = url + api;
    }

    // 業務請求轉發在這裏處理
    public void run() {
        try {
            long startTime = System.currentTimeMillis();
            logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId());

            String url = this.api.replace("/qbs/api/", "");
            String key = url;
            String param = "";
            if (url.contains("/")) {
                key = url.substring(0, url.indexOf("/"));
                param = url.substring(url.indexOf("/"), url.length());
            }
            this.api = routeService.getRoute(key) + param;

            if (!StringUtils.isEmpty(this.api)) {
                String json = "";

                logger.info("======");
                // 請求入參
                MultiValueMap<String, String> paramMap = ResponseBuilder.getUsefulParam(ctx.getRequest().getParameterMap());
                String requestMethod = request.getMethod();
                String contentType = request.getContentType();

                if ("GET".equals(requestMethod)) { // GET 請求
                    if (paramMap.size() > 0) {
                        api = ResponseBuilder.buildGetUrl(api, paramMap);
                    }
                    logger.info("PARAM url: {} param: {}", api, paramMap);
                    json = restTemplate.getForObject(api, String.class, paramMap);
                }

                else if ("POST".equals(requestMethod)) { // POST 請求
                    logger.info("PARAM url: {} param: {}", api, paramMap);
                    HttpHeaders headers = new HttpHeaders();
                    headers.setContentType(contentType.equals("application/json") ? MediaType.APPLICATION_JSON : MediaType.APPLICATION_FORM_URLENCODED);

                    if (contentType.equals("application/json")) { // json 提交
                        StringBuffer sb = new StringBuffer("");
                        String temp;

                        BufferedReader br = new BufferedReader(new InputStreamReader(request.getInputStream(), "utf-8"));
                        while ((temp = br.readLine()) != null) {
                            sb.append(temp);
                        }
                        br.close();

                        String body = sb.toString();
                        HttpEntity<String> formEntity = new HttpEntity<>(body, headers);
                        json = restTemplate.postForObject(api, formEntity, String.class);
                    }

                    else { // form 表單提交
                        HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(paramMap, headers);
                        ResponseEntity<String> response = restTemplate.postForEntity(api, request, String.class);
                        json = response.getBody();
                    }
                }

                logger.info("======");
                logger.info("RESULT json: {}", json);
                ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), json);
            } else {
                logger.info("key: {}", key);
                ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), ResponseBuilder.buildJsonString(400, "key無效,key: " + key));
            }
            ctx.complete(); // 通知容器,異步處理完成
            logger.info("======");
            long endTime = System.currentTimeMillis();
            logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms.");
        } catch (Exception e) {
            logger.error("AsyncExecutor e: " + e.getMessage());
        }
    }
}

測試結果

輸入圖片說明

能夠發如今請求進入以後將業務放到線程池中異步執行,請求退出,業務處理完成以後進行響應,轉發和響應異步化。 異步化以後吞吐量提高了,可是響應時間長了,也就是異步化並不會提高響應時間,可是會增長吞吐量和增長咱們須要的靈活性。app

相關文章
相關標籤/搜索