本文基於 Jetty 8.1.x 版本簡單介紹 Jetty Embedded Server 核心概念,線程模型,啓動流程。如下代碼片斷摘自 Jetty 源代碼 中的 example-jetty-embedded 模塊的 OneServletContext.javajava
public class OneServletContext { public static void main(String[] args) throws Exception { Server server = new Server(8080); ServletContextHandler context = new ServletContextHandler( ServletContextHandler.SESSIONS); context.setContextPath("/"); server.setHandler(context); ... // Serve some hello world servlets context.addServlet(new ServletHolder(new HelloServlet()),"/*"); context.addServlet(new ServletHolder( new HelloServlet("Buongiorno Mondo")),"/it/*"); context.addServlet(new ServletHolder( new HelloServlet("Bonjour le Monde")),"/fr/*"); server.start(); server.join(); } }
Embedded Jetty 使用起來很方便,少數幾行代碼就能夠實現一個 http (Servlet)接口安全
The lifecycle(生命週期) interface for generic components,聲明 start(啓動),stop(中止),isRunning,isStarted(查詢狀態)等方法app
實現 LifeCycle 接口的抽象類,提供生命週期管理默認實現:例如經過 _state 屬性保存組件狀態,提供 start,stop 默認實現ide
public final void start() throws Exception { // 線程安全 synchronized (_lock) { try { // 狀態保護 if (_state == _STARTED || _state == _STARTING) { return; } setStarting(); // 將具體的 start 邏輯推遲到具體的子類實現 doStart(); setStarted(); } catch (...) { ... } } }
An AggregateLifeCycle is an LifeCycle implementation for a collection of contained beans
AggregateLifeCycle 繼承自 AbstractLifeCycle,管理一組 Bean 的生命週期oop
public class AggregateLifeCycle extends AbstractLifeCycle implements Destroyable, Dumpable { private final List<Bean> _beans = new CopyOnWriteArrayList<>(); private boolean _started = false; @Override protected void doStart() throws Exception { for (Bean b: beans) { if (b._managed && b._bean instanceof LifeCycle) { LifeCycle l = (LefeCycle) b._bean; if (!l.isRunning()) { // start managed bean l.start(); } } } _started =true; super.doStart(); } }
類層次this
AbstractLifeCycle AggregateLifeCycle AbstractHandler AbstractHandlerContainer HandlerWrapper Server
Server 類是 Jetty 的《門面》類,包含:spa
經過 Server 類提供的各類 set 屬性方法能夠定製 Connector, ThreadPool,若是沒有特殊指定 Server 類會建立默認實現,好比默認的 Connector 爲 SelectChannelConnector,默認的 ThreadPool 爲 QueuedThreadPool線程
(主要)類層次:code
Connector AbstractConnector AbstractNIOConnector SelectChannelConnector
Connector 接口及其實現類用於 接收客戶端請求,(HTTP/SPDY)協議解析,最終調用 Server 中的(Request)Handler 處理請求,SelectChannelConnector 是 Server 默認使用的 Connectorcomponent
public Server(int port) { setServer(this); Connector connector = new SelectChannelConnector(); connector.setPort(port); setConnectors(new Connector[]{ connector }); }
Handler 或者叫做 Request Handler,用於處理客戶端請求
public interface Handler extends LifeCycle, Destroyable { ... void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response); ... }
handle 方法的簽名已經很接近 Servlet service 方法,這裏的 target 一般是請求的 uri,用於根據 Servlet 配置規則找到具體的 Servlet 並調用其 service 方法
Jetty Server 使用線程池來處理客戶端請求,ThreadPool 接口定義了線程池基本操做,Server 默認使用 QueuedThreadPool 類,若是須要,能夠經過 Server.setThreadPool 方法手動設置線程池
注:Jetty 9.x 線程模型比較複雜,Jetty 8.x 比較簡單,代碼好讀一些~
QueuedThreadPool 類層次結構
ThreadPool SizedThreadPool QueuedThreadPool
線程池核心問題:
經過最小線程個數,最大線程個數,線程最大空閒時間 來動態建立,銷燬線程
// QueuedThreadPool.java public class QueuedThreadPool { // 默認最大線程數 private int _maxThreads = 254; // 默認最小線程數 private int _minThreads = 8; ... @Override protected void doStart() throws Exception { ... int threads = _threadsStarted.get(); // 啓動 _minThreads 個線程 while (isRunning() && threads < _minThreads) { startThread(threads); threads = _threadStarted.get(); } } }
當線程空閒時間超過設定的閾值 _maxIdleTimeMs 並且線程池的線程個數高於 _minThreads 時,線程將從 Runnable run 方法中退出
# QueuedThreadPool.java private Runnable _runnable = new Runnable() { public void run() { boolean shrink = false; ... try { Runnable job = _jobs.poll() while (isRunning()) { // job loop:從隊列中拉取 job 並執行直到隊列爲空 while (job != null && isRunning()) { runJob(job); job = _jobs.poll(); } // idle loop try { _threadsIdle.incrementAndGet(); while (isRunning() && job == null) { // 若是 _maxIdleTimeMs < 0 就進入阻塞模式,直到成功拉取到 job if (_maxIdleTimeMs <= 0) { job = _jobs.take(); } else { final int size = _threadsStarted.get(); // 若是線程個數少於 _minThreads 那麼不須要回收線程 if (size > _minThreads) { long last = _lastShrink.get(); long now = System.currentTimeMillis(); if (last == 0 || (now - last) > _maxIdleTimeMs) { shrink = _lastShrink.compareAndSet(last, now) && _threadsStarted.compareAndSet(size, size - 1); if (shringk) { // 退出 while 循環,即終止線程 return; } } } // 帶 timeout 的 poll job = idleJobPoll(); } } } } } catch (...) { } finally { } } }
使用阻塞隊列,工做線程從阻塞隊列中主動拉取(poll)任務(job),QueuedThreadPool 默認使用 ArrayBlockingQueue,若是須要能夠經過構造方法手動指定阻塞隊列的具體實現
public class QueuedThreadPool { public QueuedThreadPool(BlockingQueue<Runnable> jobQ) { this(); _jobs = jobQ; _jobs.clear(); } public boolean dispatch(Runnable job) { if (isRunning()) { final int jobQ = _jobs.size(); final int idle = getIdleThreads(); // 將 job 放入隊列中 if (_jobs.offer(job)) { // 若是當前沒有 idle 的線程,或者 隊列中堆積的 job 比 idle 線程多那麼嘗試啓動新的線程 if (idle == 0 || jobQ > idle) { int threads = _threadsStarted.get(); if (threads < _maxThreads) { startThread(threads); } } return true; } } return false; } }
Server 的啓動開始於 start 方法的調用,如上文所述,Server 類繼承自 AggregateServer,父類的 start 方法最終調用 Server 的 doStart 方法
@Override protected void doStart() throws Exception { ... // 建立默認的 ThreadPool if (_threadPool == null) { setThreadPool(new QueuedThreadPool()) } try { super.doStart(); } catch(Throwable e) {...} // 啓動 Connector,接收,處理請求 if (_connectors != null && mex.size == 0) { for (int i = 0; i < _connectors.length; i++) { try { _connectors[i].start(); } catch (Throwable e) { mex.add(e); } } } ... }