Netty是一個很是優秀的java nio框架,這已無需多言。國慶時逛StackOverFlow,發現有人問如何用netty來支持Spring MVC,逛了一圈github並無找到有價值的分享。
正好,我一直都想本身實現一個web容器,因而本着重複造輪子的精神,寫了一個Xcafe。html
⑴ 使用SpringMVC處理http請求
⑵ 靜態資源緩存和直接返回數據
⑶ 本機session生成和緩存
⑷ 支持直接返回對象
⑸ 支持文件上傳下載(MultipartFile)
⑹ 支持ServletOutPutStream寫返回數據java
⑴ 使用Spring MVC處理WebSocket
⑵ 實現Xcafe MVC 框架(不使用java servlet api,而是徹底根據Netty http編解碼的實現)
⑶ 支持根據配置選擇使用Spring MVC 或 Xcafe MVC
⑷ 負載均衡
⑸ 分佈式緩存、分佈式Session
⑹ 異步消息發送git
考慮到大部分Web請求的業務邏輯都須要請求數據庫、讀寫文件等操做,爲了儘量多地接受鏈接,儘量多地處理請求,當前的實現是將全部的業務邏輯處理交由其它線程池去處理。github
這將會致使線程間通訊和頻繁的線程切換。當將Xcafe用做負載均衡、緩存服務器或者處理其它簡單的不耗時的請求時,這並非一個合理的選擇。所以,將來將在初始化容器時能夠根據配置選擇在workerGroup處理仍是使用額外的ThreadPool。web
將來的異步消息處理和WebSocket協議實現後,若是通過測試有必要,將再在上面的線程模型基礎上增長消息發送隊列線程池,線程池中的每個線程負責維護一個消息發送隊列,初步考慮使用java的fork/join框架最大限度地處理消息發送請求。線程模型將會演變成:算法
bossGroup負責鏈接
workerGroup負責編解碼
BusinessThreadPool負責業務邏輯
MessageThreadPool負責消息發送數據庫
![]() |
1. core 容器核心,鏈接管理和協議處理 2. cache 靜態資源緩存 3. example 示例 輸入如下網址 http://localhost:18898/test/index.do http://localhost:18898/3.html 可進行簡單的演示 4. mvc 將來須要實現的mvc框架 5. util 一些經常使用的工具類 6. web 容器須要使用到的根據Java Servlet api的具體實現 7. views 靜態資源和模板文件的目錄,能夠選擇將其包含到Jar文件中 8. 配置文件 |
全類名:com.igeeksky.xcafe.core.HttpServer
初始化服務(線程組、端口、緩衝區大小等……),將來須要完善CoreContext類,根據配置文件(或BootInit.class)初始化服務器上下文,並根據配置啓動不一樣類型的服務。api
public class HttpServer { private static final Logger logger = LoggerFactory.getLogger(HttpServer.class); private static final boolean SSL = System.getProperty("ssl") != null; private static int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "18898")); private final int BACKLOG = 1024; private final int TIMEOUT = 300; private static boolean running = false; public static void main(String[] args) { //…………………… } private void doStart() throws CertificateException, SSLException { final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); final ExecutorService executorService = Executors.newFixedThreadPool(10); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, BACKLOG) //設定最大鏈接隊列 .option(ChannelOption.SO_RCVBUF, 1024 * 256) //設定數據接收緩衝區大小 .option(ChannelOption.SO_SNDBUF, 1024 * 256) //設定數據發送緩衝區大小 .childOption(ChannelOption.SO_KEEPALIVE, true) //是否保持鏈接 //傳入附帶異步線程池的channelHandler .childHandler(new HttpPipelineInitializer(executorService, sslCtx, TIMEOUT)); Channel channel = b.bind(PORT).sync().channel(); //綁定端口直到綁定完成 channel.closeFuture().sync(); //阻塞關閉操做 } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
全類名:com.igeeksky.xcafe.core.HttpPipelineInitializer緩存
public class HttpPipelineInitializer extends ChannelInitializer<SocketChannel> { private final SslContext sslCtx; private final int timeOut; private final ExecutorService executorService; public HttpPipelineInitializer(ExecutorService executorService, SslContext sslCtx, int timeOut){ this.executorService = executorService; this.sslCtx = sslCtx; this.timeOut = timeOut; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast("timeout", new ReadTimeoutHandler(timeOut)); pipeline.addLast("codec", new HttpServerCodec()); pipeline.addLast(new HttpContentCompressor(9)); pipeline.addLast("aggegator", new HttpObjectAggregator(1024 * 1024 * 1024)); pipeline.addLast("ServerInbound", new HttpServerInboundHandler(executorService)); } }
全類名:com.igeeksky.xcafe.core.handler.HttpServerInboundHandler
待完善:根據服務器上下文環境,可選擇是否交由線程池異步處理業務邏輯服務器
public class HttpServerInboundHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(HttpServerInboundHandler.class); private ExecutorService executorService; public HttpServerInboundHandler(ExecutorService executorService){ this.executorService = executorService; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpRequest req = (FullHttpRequest)msg; boolean isKeepAlive = HttpUtil.isKeepAlive(req); if(!prepare(ctx, req, isKeepAlive)){ return; } executorService.execute(new Runnable(){ @Override public void run() { HttpResponse response = HttpDispacher.INSTANCE.dispach(ctx, msg); doWriteAndFlush(ctx, isKeepAlive, (FullHttpResponse)response); req.content().release(); } }); } //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看………………… }
全類名:com.igeeksky.xcafe.core.dispacher.HttpDispacher
根據URL構建請求參數,根據方法類型將請求交給不一樣的方法處理
public enum HttpDispacher { INSTANCE; private static final HttpActionAdapter action = HttpActionAdapter4Spring.INSTANCE; public HttpResponse dispach(ChannelHandlerContext ctx, Object msg){ //long start = System.currentTimeMillis(); //非HTTP請求處理 if (!(msg instanceof FullHttpRequest)) { return action.doNotHttpRequest(ctx, msg); } FullHttpRequest request = (FullHttpRequest) msg; HttpMethod method = request.method(); //Http請求方法爲空處理 if(null == method){ return action.doNullHttpMethod(ctx, request); } //構建URI String uri = request.uri(); String[] temp = uri.split("\\?"); String shortUri = temp[0]; //根據URL構建請求參數 Map<String, String[]> parameters = getParameters(temp); if(method.equals(HttpMethod.GET)){ return action.doGet(ctx, request, shortUri, parameters); } else if(method.equals(HttpMethod.POST)){ return action.doPost(ctx, request, shortUri, parameters); } //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看………………… else{ return action.doUnContainMethod(ctx, request, shortUri, parameters); } } private Map<String, String[]> getParameters(String[] temp) { //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看………………… } }
全類名:com.igeeksky.xcafe.core.action.HttpActionAdapter4Spring
初始化SpringMVC環境,封裝http請求/響應爲Java Servlet API的具體實現,將請求交給具體的MVC框架去處理。今後類開始,進入真正的業務邏輯處理,後面的處理邏輯不該再與Netty代碼發生關聯。
攔截處理普通靜態資源請求並實現緩存機制,也能夠選擇將所有請求交由MVC框架處理。
可擴展:能夠根據不一樣的MVC框架實現不一樣的適配器。
待完善:靜態資源請求處理類
public enum HttpActionAdapter4Spring implements HttpActionAdapter { INSTANCE; private static final Logger logger = LoggerFactory.getLogger(HttpActionAdapter4Spring.class); //Spring 應用上下文環境 private static final XmlWebApplicationContext wac = new XmlWebApplicationContext(); //Spring MVC的請求處理分發器(經過此類實現Spring MVC支持) private static final DispatcherServlet dispatcherServlet; //容器核心上下文環境配置 private static final XcafeCoreContext coreContext = new XcafeCoreContext(); //兼容Java Servlet API的servletContext private static final XcafeServletContext servletContext = new XcafeServletContext(); //靜態資源緩存類 private static final ResourceCache CACHE = ResourceCacheDefault.INSTANCE; //Netty提供的數據處理工廠類(主要用於Multipart File解碼的參數配置) private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MAXSIZE); //將靜態資源請求交由容器處理 或 MVC框架處理 private static boolean isStaticSupport = true; static{ //初始化Spring上下文環境 //爲避免篇幅過長,省略其它代碼,請登陸github或下載壓縮文件查看………………… } @Override public HttpResponse doGet(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters) { FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //請求靜態資源(可選擇是否進入SpringMVC處理) if(isStaticSupport && !requestURI.endsWith(".do") && !requestURI.endsWith(".mo") && !requestURI.endsWith(".ws")){ return getStaticResource(request, resp, requestURI, parameters); } /* 請求動態資源 */ //Request包裝類:將Netty的request包裝成兼容Java Servlet API的request FullHttpRequestWrapper requestWrapper = new FullHttpRequestWrapper(servletContext, request, requestURI, parameters); //Response包裝類:將Netty的response包裝成兼容Java Servlet API的Response FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp); requestWrapper.setResponse(responseWrapper); //interceptors.doFilter(requestWrapper, responseWrapper); try { dispatcherServlet.service(requestWrapper, responseWrapper); } catch (ServletException | IOException e) { logger.error("", e); } return responseWrapper.getResponse(); } @Override public HttpResponse doPost(ChannelHandlerContext ctx, FullHttpRequest request, String requestURI, Map<String, String[]> parameters){ FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); //MultiPartRequest包裝類:將Netty的request包裝成兼容Java Servlet API的MultiPartRequest FullHttpRequestWrapper requestWrapper = new MultipartFullHttpRequestWrapper(servletContext,factory, request, requestURI, parameters); FullHttpResponseWrapper responseWrapper = new FullHttpResponseWrapper(ctx, resp); try { dispatcherServlet.service(requestWrapper, responseWrapper); } catch (ServletException | IOException e) { logger.error("", e); } return responseWrapper.getResponse(); } /** * <b>返回靜態資源</b></br></br> * 待修改:新建http協議管理類,根據http協議完善請求和響應</br> */ private HttpResponse getStaticResource(FullHttpRequest request, FullHttpResponse resp, String requestURI, Map<String, String[]> parameters) { //省略代碼,請登陸github或下載壓縮文件查看…………………… } }
全類名:com.igeeksky.xcafe.cache.ResourceCacheDefault
採用單線程異步循環監聽執行的無鎖設計,實現了LRU,LFU,FIFO算法,與及綜合性的LRFU算法。
待完善:根據配置文件設置參數
具體實現超過600行代碼,爲避免篇幅過長,請登陸github或下載壓縮文件查看。
![]() |
1. FullHttpRequestWrapper 封裝Netty 的Http請求,轉換成兼容Java Servlet API的Request。 FullHttpResponseWrapper如上。 2. MultipartFullHttpRequestWrapper 封裝Netty 的Http請求,轉換成兼容Java Servlet API的MultipartRequest 3. XcafeMultipartFile 封裝Netty的FileUpload,轉換成兼容Java Servlet API的MultipartFile 4. XcafeServletOutputStream 封裝Netty的Bytebuf,將其轉換成兼容Java Servlet API的ServletOutputStream實現流方式寫數據。 XcafeServletInputStream如上。 5. LocalSessionManager 工具類:實現Session的本地緩存和讀取。分佈式緩存待後續再實現 6. websocket支持待後續再處理 |
Xcafe當前只是一個嘗試性的Web容器,若是要成長爲一個稍稍成熟的項目,仍須要完善不少細節並通過大量的測試。若是你有任何建議和問題,請聯繫我。
Github:
https://github.com/tonylau08/xcafe