Xcafe:Netty實現兼容SpringMVC的Web容器

1.0 前言

  Netty是一個很是優秀的java nio框架,這已無需多言。國慶時逛StackOverFlow,發現有人問如何用netty來支持Spring MVC,逛了一圈github並無找到有價值的分享。
  正好,我一直都想本身實現一個web容器,因而本着重複造輪子的精神,寫了一個Xcafe。html

 

2.0 概覽

2.1 已實現功能:

  ⑴ 使用SpringMVC處理http請求
  ⑵ 靜態資源緩存和直接返回數據
  ⑶ 本機session生成和緩存
  ⑷ 支持直接返回對象
  ⑸ 支持文件上傳下載(MultipartFile)
  ⑹ 支持ServletOutPutStream寫返回數據java

 

2.2 待實現功能:

  ⑴ 使用Spring MVC處理WebSocket
  ⑵ 實現Xcafe MVC 框架(不使用java servlet api,而是徹底根據Netty http編解碼的實現)
  ⑶ 支持根據配置選擇使用Spring MVC 或 Xcafe MVC
  ⑷ 負載均衡
  ⑸ 分佈式緩存、分佈式Session
  ⑹ 異步消息發送git

 

3.0 架構

3.1 線程模型

  考慮到大部分Web請求的業務邏輯都須要請求數據庫、讀寫文件等操做,爲了儘量多地接受鏈接,儘量多地處理請求,當前的實現是將全部的業務邏輯處理交由其它線程池去處理。github

  這將會致使線程間通訊和頻繁的線程切換。當將Xcafe用做負載均衡、緩存服務器或者處理其它簡單的不耗時的請求時,這並非一個合理的選擇。所以,將來將在初始化容器時能夠根據配置選擇在workerGroup處理仍是使用額外的ThreadPool。web

  將來的異步消息處理和WebSocket協議實現後,若是通過測試有必要,將再在上面的線程模型基礎上增長消息發送隊列線程池,線程池中的每個線程負責維護一個消息發送隊列,初步考慮使用java的fork/join框架最大限度地處理消息發送請求。線程模型將會演變成:算法

  bossGroup負責鏈接
  workerGroup負責編解碼
  BusinessThreadPool負責業務邏輯
  MessageThreadPool負責消息發送數據庫

 

3.2 目錄介紹

1. core 容器核心,鏈接管理和協議處理
2. cache 靜態資源緩存
3. example 示例
輸入如下網址
http://localhost:18898/test/index.do
http://localhost:18898/3.html
可進行簡單的演示

4. mvc 將來須要實現的mvc框架
5. util 一些經常使用的工具類
6. web 容器須要使用到的根據Java Servlet api的具體實現
7. views 靜態資源和模板文件的目錄,能夠選擇將其包含到Jar文件中
8. 配置文件

 

4. 關鍵類

4.1 服務器啓動類

全類名:com.igeeksky.xcafe.core.HttpServer
  初始化服務(線程組、端口、緩衝區大小等……),將來須要完善CoreContext類,根據配置文件(或BootInit.class)初始化服務器上下文,並根據配置啓動不一樣類型的服務。api

public class HttpServer {


  private static final Logger logger = LoggerFactory.getLogger(HttpServer.class);
  private static final boolean SSL = System.getProperty("ssl") != null;
  private static int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "18898"));
   
  private final int BACKLOG = 1024;
  private final int TIMEOUT = 300;
  private static boolean running = false;


  public static void main(String[] args) {
    //……………………
  }


private void doStart() throws CertificateException, SSLException {
    final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
    
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    try {
      ServerBootstrap b = new ServerBootstrap();
      b.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, BACKLOG)    //設定最大鏈接隊列
       .option(ChannelOption.SO_RCVBUF, 1024 * 256)    //設定數據接收緩衝區大小
       .option(ChannelOption.SO_SNDBUF, 1024 * 256)    //設定數據發送緩衝區大小
       .childOption(ChannelOption.SO_KEEPALIVE, true)    //是否保持鏈接
      //傳入附帶異步線程池的channelHandler
       .childHandler(new HttpPipelineInitializer(executorService, sslCtx, TIMEOUT));


      Channel channel = b.bind(PORT).sync().channel();    //綁定端口直到綁定完成


      channel.closeFuture().sync();            //阻塞關閉操做
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      workerGroup.shutdownGracefully();
      bossGroup.shutdownGracefully();
    }
  }

}

 

4.2 註冊編解碼處理器channelHandler

全類名:com.igeeksky.xcafe.core.HttpPipelineInitializer緩存

public class HttpPipelineInitializer extends ChannelInitializer<SocketChannel> {
  
  private final SslContext sslCtx;
  private final int timeOut;
  private final ExecutorService executorService;
  
  public HttpPipelineInitializer(ExecutorService executorService, SslContext sslCtx, int timeOut){
    this.executorService = executorService;
    this.sslCtx = sslCtx;
    this.timeOut = timeOut;
  }


  @Override
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
      pipeline.addLast(sslCtx.newHandler(ch.alloc()));
        }
    pipeline.addLast("timeout", new ReadTimeoutHandler(timeOut));
    pipeline.addLast("codec", new HttpServerCodec());
    pipeline.addLast(new HttpContentCompressor(9));
    pipeline.addLast("aggegator", new HttpObjectAggregator(1024 * 1024 * 1024));        
    pipeline.addLast("ServerInbound", new HttpServerInboundHandler(executorService));
  }

}

 

4.3 預處理並轉發請求

全類名:com.igeeksky.xcafe.core.handler.HttpServerInboundHandler
待完善:根據服務器上下文環境,可選擇是否交由線程池異步處理業務邏輯服務器

public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {

  private static final Logger logger = LoggerFactory.getLogger(HttpServerInboundHandler.class);
  
  private ExecutorService executorService;
  
  public HttpServerInboundHandler(ExecutorService executorService){
    this.executorService = executorService;
  }

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    FullHttpRequest req = (FullHttpRequest)msg;
    boolean isKeepAlive = HttpUtil.isKeepAlive(req);
    
    if(!prepare(ctx, req, isKeepAlive)){
      return;
    }
    
    executorService.execute(new Runnable(){
      @Override
      public void run() {
        HttpResponse response = HttpDispacher.INSTANCE.dispach(ctx, msg);
        doWriteAndFlush(ctx, isKeepAlive, (FullHttpResponse)response);
        req.content().release();
      }
    });
    
    
  }
  
  //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看…………………
}

 

4.4 Http請求分發器

全類名:com.igeeksky.xcafe.core.dispacher.HttpDispacher

根據URL構建請求參數,根據方法類型將請求交給不一樣的方法處理

public enum HttpDispacher {
  
  INSTANCE;
  
  private static final HttpActionAdapter action = HttpActionAdapter4Spring.INSTANCE;
    
  public HttpResponse dispach(ChannelHandlerContext ctx, Object msg){
    //long start = System.currentTimeMillis();
    //非HTTP請求處理
    if (!(msg instanceof FullHttpRequest)) {
      return action.doNotHttpRequest(ctx, msg);
    }
    
    FullHttpRequest request = (FullHttpRequest) msg;
    HttpMethod method = request.method();
    
    //Http請求方法爲空處理
    if(null == method){
      return action.doNullHttpMethod(ctx, request);
    }
    
    //構建URI
    String uri = request.uri();
    String[] temp = uri.split("\\?");
    String shortUri = temp[0];

    //根據URL構建請求參數
    Map<String, String[]> parameters = getParameters(temp);
    
    if(method.equals(HttpMethod.GET)){
      return action.doGet(ctx, request, shortUri, parameters);
    }
    else if(method.equals(HttpMethod.POST)){
      return action.doPost(ctx, request, shortUri, parameters);
    }

    //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看…………………
    else{
      return action.doUnContainMethod(ctx, request, shortUri, parameters);
    }
  }
  
  private Map<String, String[]> getParameters(String[] temp) {
    //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看…………………
  }
}

 

4.5 Netty4SpringMVC適配器

全類名:com.igeeksky.xcafe.core.action.HttpActionAdapter4Spring
  初始化SpringMVC環境,封裝http請求/響應爲Java Servlet API的具體實現,將請求交給具體的MVC框架去處理。今後類開始,進入真正的業務邏輯處理,後面的處理邏輯不該再與Netty代碼發生關聯。
  攔截處理普通靜態資源請求並實現緩存機制,也能夠選擇將所有請求交由MVC框架處理。
可擴展:能夠根據不一樣的MVC框架實現不一樣的適配器。
待完善:靜態資源請求處理類

public enum HttpActionAdapter4Spring implements HttpActionAdapter {
  
  INSTANCE;
  
  private static final Logger logger = LoggerFactory.getLogger(HttpActionAdapter4Spring.class);
  
  //Spring 應用上下文環境
  private static final XmlWebApplicationContext wac = new XmlWebApplicationContext();
  
  //Spring MVC的請求處理分發器(經過此類實現Spring MVC支持)
  private static final DispatcherServlet dispatcherServlet;
  
  //容器核心上下文環境配置
  private static final XcafeCoreContext coreContext = new XcafeCoreContext();
  
  //兼容Java Servlet API的servletContext
  private static final XcafeServletContext servletContext = new XcafeServletContext();
  
  //靜態資源緩存類
  private static final ResourceCache CACHE = ResourceCacheDefault.INSTANCE; 
  
  //Netty提供的數據處理工廠類(主要用於Multipart File解碼的參數配置)
  private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MAXSIZE);
  
  //將靜態資源請求交由容器處理 或 MVC框架處理
  private static boolean isStaticSupport = true;
  
  static{
  //初始化Spring上下文環境
  //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看…………………
  }

  @Override
  public HttpResponse doGet(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters) {
    
    FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
    
    //請求靜態資源(可選擇是否進入SpringMVC處理)
    if(isStaticSupport && !requestURI.endsWith(".do") && !requestURI.endsWith(".mo") && !requestURI.endsWith(".ws")){
      return getStaticResource(request, resp, requestURI, parameters);
    }
    
    /* 請求動態資源 */
    //Request包裝類:將Netty的request包裝成兼容Java Servlet API的request
    FullHttpRequestWrapper requestWrapper = new FullHttpRequestWrapper(servletContext, request, requestURI, parameters);

    //Response包裝類:將Netty的response包裝成兼容Java Servlet API的Response
    FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp);
    requestWrapper.setResponse(responseWrapper);
    //interceptors.doFilter(requestWrapper, responseWrapper);
    
    try {
      dispatcherServlet.service(requestWrapper, responseWrapper);
    } catch (ServletException | IOException e) {
      logger.error("", e);
    }
    
    return responseWrapper.getResponse();
  }
  
  @Override
  public HttpResponse doPost(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters){
    FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);

    //MultiPartRequest包裝類:將Netty的request包裝成兼容Java Servlet API的MultiPartRequest
    FullHttpRequestWrapper requestWrapper = new MultipartFullHttpRequestWrapper(servletContext,factory, request, requestURI, parameters);
    FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp);
    
    try {
      dispatcherServlet.service(requestWrapper, responseWrapper);
    } catch (ServletException | IOException e) {
      logger.error("", e);
    }

    return responseWrapper.getResponse();
  }

  /**
   * <b>返回靜態資源</b></br></br>
   * 待修改:新建http協議管理類,根據http協議完善請求和響應</br>
   */
  private HttpResponse getStaticResource(FullHttpRequest request, FullHttpResponse resp, String requestURI, Map<String, String[]> parameters) {
    //省略代碼,請登陸github或下載壓縮文件查看……………………
  }
}

 

4.6 靜態資源本地緩存實現

全類名:com.igeeksky.xcafe.cache.ResourceCacheDefault
  採用單線程異步循環監聽執行的無鎖設計,實現了LRU,LFU,FIFO算法,與及綜合性的LRFU算法。
  待完善:根據配置文件設置參數
  具體實現超過600行代碼,爲避免篇幅過長,請登陸github或下載壓縮文件查看。

 

4.7 web 包下面的其它類說明

1. FullHttpRequestWrapper
封裝Netty 的Http請求,轉換成兼容Java Servlet API的Request。
FullHttpResponseWrapper如上。


2. MultipartFullHttpRequestWrapper
封裝Netty 的Http請求,轉換成兼容Java Servlet API的MultipartRequest


3. XcafeMultipartFile
封裝Netty的FileUpload,轉換成兼容Java Servlet API的MultipartFile


4. XcafeServletOutputStream
封裝Netty的Bytebuf,將其轉換成兼容Java Servlet API的ServletOutputStream實現流方式寫數據。
XcafeServletInputStream如上。


5. LocalSessionManager 工具類:實現Session的本地緩存和讀取。分佈式緩存待後續再實現


6. websocket支持待後續再處理

 

5. 相關信息

  Xcafe當前只是一個嘗試性的Web容器,若是要成長爲一個稍稍成熟的項目,仍須要完善不少細節並通過大量的測試。若是你有任何建議和問題,請聯繫我。

  Github:
  https://github.com/tonylau08/xcafe

相關文章
相關標籤/搜索