Boomvc完成已經有一段時間了,但拖延到如今纔開始記錄。寫這篇文章主要是回憶和覆盤一下思路。如題所講,Boomvc是一個mvc框架,可是它自帶http server功能,也就是說不須要tomcat之類的server,能夠在一個jar包裏啓動而不須要其餘的依賴,這就須要本身去寫http server的實現,這一篇我就梳理一下實現。react
首先定義一個server接口git
public interface Server { void init(Boom boom); void start(); void stop(); }
這個接口能夠有多種實現,能夠從nio socket開始寫,也能夠用netty這樣的很是好用的network層的框架實現。在這裏我實現了一個簡易版的TinyServer。github
public class TinyServer implements Server { private static final Logger logger = LoggerFactory.getLogger(TinyServer.class); private Boom boom; private Ioc ioc; private MvcDispatcher dispatcher; private Environment environment; private EventExecutorGroup boss; private EventExecutorGroup workers; private Thread cleanSession; @Override public void init(Boom boom) { ... ... } @Override public void start() { this.boss.start(); this.workers.start(); this.cleanSession.start(); } @Override public void stop() { this.boss.stop(); this.workers.stop(); } }
在這裏要關注這個地方tomcat
private EventExecutorGroup boss; private EventExecutorGroup workers;
這是我抽象出來的表示線程組,一個EventExecuteGroup持有多個EventExecute,boss接受鏈接請求,workers執行業務邏輯。看一下EventExecuteGroup的實現。網絡
public class EventExecutorGroup implements Task { private int threadNum; private List<EventExecutor> executorList; private int index; private ThreadFactory threadName; private EventExecutorGroup childGroup; private MvcDispatcher dispatcher; public EventExecutorGroup(int threadNum, ThreadFactory threadName, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager) { this.threadNum = threadNum; this.threadName = threadName; this.childGroup = childGroup; this.dispatcher = dispatcher; this.executorList = new ArrayList<>(this.threadNum); IntStream.of(this.threadNum) .forEach(i-> { try { this.executorList.add(new EventExecutor(this.threadName, this.childGroup, this.dispatcher, sessionManager)); } catch (IOException e) { throw new RuntimeException(e); } }); this.index = 0; } public void register(SelectableChannel channel, int ops) throws ClosedChannelException { int index1 = 0; synchronized (this){ index1 = this.index%this.threadNum; this.index++; } this.executorList.get(index1).register(channel, ops); } public void register(SelectableChannel channel, int ops, Object att) throws ClosedChannelException { int index1 = 0; synchronized (this){ index1 = this.index%this.threadNum; this.index++; } this.executorList.get(index1).register(channel, ops, att); } @Override public void start() { this.executorList.forEach(e->e.start()); } @Override public void stop() { this.executorList.forEach(e->e.stop()); } }
EventExecutor就是一個io線程,它持有一個selector,selector是Java NIO核心組件中的一個,用於檢查一個或多個Channel(通道)的狀態是否處於可讀、可寫。如此能夠實現單線程管理多個channels,也就是能夠管理多個網絡連接。io線程就不斷輪詢這個selector,獲取多個selector key,根據這個key的狀態,好比accept,read,write執行不一樣的邏輯。在這裏EventExecutor是有多個的,也就是說selector有多個,boss EventExecutorGroup只有一個EventExecutor,它負責accept鏈接請求,並把接受的鏈接註冊到workers EventExecutorGroup裏,由worker線程處理read和write。session
public class EventExecutor { private static final Logger logger = LoggerFactory.getLogger(EventExecutor.class); private ThreadFactory threadName; private EventExecutorGroup childGroup; private Selector selector; private Thread ioThread; private MvcDispatcher dispatcher; private Runnable task; private Semaphore semaphore = new Semaphore(1); public EventExecutor(ThreadFactory threadName, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager) throws IOException { this.threadName = threadName; this.childGroup = childGroup; this.dispatcher = dispatcher; this.selector = Selector.open(); this.task = new EventLoop(selector, this.childGroup, this.dispatcher, sessionManager, semaphore); this.ioThread = threadName.newThread(this.task); } public void register(SelectableChannel channel, int ops) throws ClosedChannelException { channel.register(this.selector, ops); } public void register(SelectableChannel channel, int ops, Object att) throws ClosedChannelException { /* 將接收的鏈接註冊到selector上 // 發現沒法直接註冊,一直獲取不到鎖 // 這是因爲 io 線程正阻塞在 select() 方法上,直接註冊會形成死鎖 // 若是這時直接調用 wakeup,有可能尚未註冊成功又阻塞了,可使用信號量從 select 返回後先阻塞,等註冊完後在執行 */ try { this.semaphore.acquire(); this.selector.wakeup(); channel.register(this.selector, ops, att); }catch (InterruptedException e){ logger.error("", e); }finally { this.semaphore.release(); } } public void start(){ ((Task)this.task).start(); this.ioThread.start(); } public void stop(){ ((Task)this.task).stop(); } }
selector輪詢是在EventLoop這裏實現的。併發
public class EventLoop implements Runnable, Task { private static final Logger logger = LoggerFactory.getLogger(EventLoop.class); private Selector selector; private EventExecutorGroup childGroup; private MvcDispatcher dispatcher; private FilterMapping filterMapping; private volatile boolean isStart = false; private Semaphore semaphore; private SessionManager sessionManager; public EventLoop(Selector selector, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager, Semaphore semaphore) { ... } @Override public void run() { while(this.isStart){ try { int n = -1; try { n = selector.select(1000); semaphore.acquire(); } catch (InterruptedException e) { logger.error("", e); } finally { semaphore.release(); } if(n<=0) continue; } catch (IOException e) { logger.error("", e); continue; } Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key = iterator.next(); iterator.remove(); if(!key.isValid()) continue; try { if (key.isAcceptable()) { accept(key); } if (key.isReadable()) { read(key); } if (key.isWritable()) { write(key); } }catch (Exception e){ if(key!=null&&key.isValid()){ try { key.channel().close(); } catch (IOException e1) { e1.printStackTrace(); } } logger.error("", e); } } } } private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); this.childGroup.register(socketChannel, SelectionKey.OP_READ, new HttpProtocolParser(socketChannel)); } private void read(SelectionKey key) throws Exception{ ... } private void write(SelectionKey key) throws IOException { ... } @Override public void start() { this.isStart = true; } @Override public void stop() { this.filterMapping.distory(); this.isStart = false; } public Semaphore semaphore(){ return this.semaphore; } }
這就是一個經典的nio程序模式,要注意這裏mvc
this.childGroup.register(socketChannel, SelectionKey.OP_READ, new HttpProtocolParser(socketChannel));
這就把接受的鏈接註冊到其餘selector了。
這裏我用了一個nio程序的多reactor模式,主線程中EventLoop對象經過 select監控鏈接創建事件,收到事件後經過 Acceptor接收,將新的鏈接分配給某個子EventLoop。
子線程中的EventLoop完成 read -> 業務處理 -> send 的完整流程。這種模式主線程和子線程的職責很是明確,主線程只負責接收新鏈接,子線程負責完成後續的業務處理,而且使用多個selector,read,業務處理,write不會影響accept,這對於大量併發鏈接能夠提升accept的速度,不會因業務處理使大量鏈接堆積,這裏其實參考了netty的思想。以下圖app
在寫EventExecutor的register方法是,發現若是直接在selector上調用register的話,可能會形成死鎖。由於selector被多個線程訪問,當其中一個線程調用selector.select()方法時發生阻塞,這個線程會一直持有selector的鎖,這時另外一個線程的register方法會被阻塞。若是這時直接調用 wakeup,有可能尚未註冊成功又阻塞了,可使用信號量從 select 返回後先阻塞,等註冊完後在執行。具體實現以下框架
try { this.semaphore.acquire(); this.selector.wakeup(); channel.register(this.selector, ops, att); }catch (InterruptedException e){ logger.error("", e); }finally { this.semaphore.release(); }
try { n = selector.select(1000); semaphore.acquire(); } catch (InterruptedException e) { logger.error("", e); } finally { semaphore.release(); }
這裏semaphore就起到一個阻塞EventLoop在被喚醒時繼續執行的做用,當註冊完成時才繼續執行。好了,關於server的線程部分就寫到這,下一篇寫http協議解析部分。