做爲縱橫情場多年的老手,憲程在把到妹子後一般有如下策略 (假設憲程是影流之主
的第1024代傳人而且只剩下了分身的能力)java
將妹子存到隊列中,不時發微信去撩一下,若是有意向的話憲程會使用分身能力再建立一個憲程
去把妹git
憲程本身執行把妹的操做,若是期間又有新的妹子看上他咋辦呢,那就將該妹子交給本身的分身憲程
去輪詢處理,而且憲程在把完妹子以後會嘗試去把分身憲程
的輪詢任務給接過來,畢竟本體老是要掌握主動權的,若是沒有接過來咋辦?只能選擇成爲分身了,畢竟此時分身憲程
已經接過了本體的工做,某種意義上他已經成爲了本體
。github
建議在閱讀以前先了解如下Tomcat的NIO模型,沒有對比就沒有傷害,你會發現Jetty NIO模型的有趣之處web
若是時間充足的話,我建議你直接閱讀附錄,瞭解如何Debug Jetty NIO功能算法
既然要了解Jetty的NIO模型,從線程的角度來講能夠分爲如下幾類spring
空閒線程
此角色會根據提交到線程池中的任務,將本身轉變爲I/O線程或者輪詢線程tomcat
Acceptor線程
該角色主要負責接收來自客戶端的鏈接並對其進行封裝以後,選擇一個Selector來提交此任務微信
輪詢線程
此角色主要負責輪詢事件,並處理其餘角色提交給此角色的任務,另外此角色能夠根據所設定的策略將輪詢任務交給其餘線程,在執行完I/O任務以後歸還到線程池中成爲空閒線程
網絡
主要參與的類有多線程
Connector
該角色主要負責JettyNIO模型中各個組件的啓動和,協調工做
SelectorManager
此角色主要對ManagedSelector
進行管理,想要和Selector進行交互可使用此類
ManagedSelector
封裝了JDK原生的selector
, 並對外提供對selector
執行操做的內部類、接口以及方法
重點 全部線程共用一個線程池
關鍵類
org.eclipse.jetty.server.ServerConnector
Connector即鏈接器,是Jetty對於網絡I/O模型的一個抽象,主要負責組裝,啓動Jetty NIO模型中所須要用到的組件。所以,咱們主要注意力集中到其實現上也就是ServerConnector
上。
初始化Connector鏈接器,咱們須要向其提供如下關鍵參數(隱去了和本文無關的參數,有興趣的可自行了解)
回收
以及提供
ByteBuffer給I/O線程使用accept
操做線程的數量selector
線程數量可是,大部分的初始化工做並非在ServerConnector
中執行的,而是在其父類中執行的操做,所以咱們將目光轉移到 org.eclipse.jetty.server.AbstractConnector
該類的初始化代碼以下,其主要作了如下工做
max(1,min(4,CPU核心數÷8))
進行計算,也就是說默認的Acceptor數量最少有一個,最多有4個想象一下,若是ServerSocketChannel被設置爲阻塞狀態以便多個線程同時執行accept操做,那麼大多數狀況下多數線程將會陷入阻塞狀態,而且線程從阻塞態恢復是有線程上下文切換的成本的所以Acceptor線程並非越多越好
public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) {
_server = server;
//檢查是否設置線程池,若是沒有則使用Server的
_executor = executor != null ? executor : _server.getThreadPool();
if (scheduler == null)
scheduler = _server.getBean(Scheduler.class);
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
// 檢查是否指定ByteBufferPool,若是沒有則本身建立一個
if (pool == null)
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
// 將這些對象交給Jetty統一管理(不在本文討論範圍內,不展開)
addBean(_server, false);
addBean(_executor);
if (executor == null)
unmanage(_executor); // inherited from server
addBean(_scheduler);
addBean(_byteBufferPool);
// ConnectionFactory主要使用來處理對應的HTTP協議
for (ConnectionFactory factory : factories)
{
addConnectionFactory(factory);
}
// 若是未指定Acceptor的數量則根據CPU核數執行計算
int cores = ProcessorUtils.availableProcessors();
if (acceptors < 0)
//根據此式能夠推出Acceptor數量最大是4最小是1
acceptors = Math.max(1, Math.min(4, cores / 8));
// Acceptor數量大於CPU核心數
// 將會引發大量的線程陷入阻塞狀態
// 沒有東西能夠accept不就阻塞了嗎
// 而要激活阻塞的線程則須要切換線程上下文會引發性能的浪費
if (acceptors > cores)
LOG.warn("Acceptors should be <= availableProcessors: " + this);
_acceptors = new Thread[acceptors];
}
複製代碼
以下圖所示個人電腦爲4核心
的i5CPU,那麼默認的Acceptor線程應該只有一個
Acceptor是一個定義在AbstractConnector
中的內部類, 其主要工做不斷調用在子類中實現accept方法,也就是接收鏈接的實現延遲到了子類中。
其代以下,能夠學到很多小技巧, 若是你不想看代碼,其總結以下
public void run() {
// 給線程起給名字
final Thread thread = Thread.currentThread();
String name = thread.getName();
_name = String.format("%s-acceptor-%d@%x-%s", name, _id, hashCode(), AbstractConnector.this.toString());
thread.setName(_name);
// 設置優先級
int priority = thread.getPriority();
if (_acceptorPriorityDelta != 0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, priority + _acceptorPriorityDelta)));
// 保存對此線程的引用
_acceptors[_id] = thread;
try
{
while (isRunning())
{
// 加鎖,等待來自其餘線程的信號說能夠開始幹活了
try (Locker.Lock lock = _locker.lock())
{
if (!_accepting && isRunning())
{
_setAccepting.await();
continue;
}
}
catch (InterruptedException e)
{
continue;
}
try
{
//調用子類的accept方法
accept(_id);
}
catch (Throwable x)
{
if (!handleAcceptFailure(x))
break;
}
}
}
finally
{
// 發生異常了,則將線程的名稱以及優先級調回原來的值
thread.setName(name);
if (_acceptorPriorityDelta != 0)
thread.setPriority(priority);
//釋放引用
synchronized (AbstractConnector.this)
{
_acceptors[_id] = null;
}
CountDownLatch stopping = _stopping;
if (stopping != null)
stopping.countDown();
}
}
複製代碼
在子類ServerConnector
中,accept
主要執行如下操做
阻塞
的形式接收來自客戶端的鏈接SocketChannel
爲非阻塞模式
,並禁用nagle算法
SocketChannel
封裝成一個Accept
事件,交給輪詢線程
處理 ServerConnector
中的代碼@Override
public void accept(int acceptorID) throws IOException {
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
private void accepted(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket); // socket.setTcpNoDelay(true);
_manager.accept(channel);
}
複製代碼
SelectorManager中最終被調用的代碼
public void accept(SelectableChannel channel, Object attachment) {
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel, attachment));
}
複製代碼
輪詢線程
主要負責輪詢I/O事件以及處理其餘線程提交到本線程任務。而且咱們能夠爲輪詢線程
指定執行策略, 在後面咱們能夠看到執行策略將如何影響輪詢線程
行爲。
首先,咱們須要先明確哪些類會參與到輪詢線程的工做中,也就是說咱們要先理清楚輪詢線程的調用鏈。
如上圖堆棧跟蹤圖紅框所標註的部分所示,參與到輪詢線程主要堆棧結構以下圖所示。
ManagedSelector
此類主要封裝了JDK的selector
類,並對外暴露操做此Selector的方法和類EatWhatYouKill
此類即輪詢線程執行策略,該類會不斷調用SelectorProducer.produce 方法產生封裝好的I/O任務,並根據其策略來決定執行這個I/O任務的方式SelectorProducer
此類爲ManagedSelector
的內部類,實現線程執行策略裏面的ExecutionStrategy.Producer
接口,該類專門用於生成供輪詢線程處理的I/O任務Jetty將JDK原生的Selector
類封裝成爲ManagedSelector
,該類主要功能是對外暴露對其封裝的selector
執行操做的接口和內部類. 其關鍵方法和內部類以下
SelectorUpdate接口 若是要對ManagedSelector
所管理的selector
進行更新(如執行註冊感興趣的I/O事件)能夠實現此接口,該接口定義以下
public interface SelectorUpdate {
void update(Selector selector);
}
複製代碼
submit方法 該方法主要用於外界將SelectorUpdate
提交到輪詢線程中以便執行對Selector
的更新操做,簡單來講此方法會執行如下操做
public void submit(SelectorUpdate update) {
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", update, this);
Selector selector = null;
synchronized (ManagedSelector.this)
{
//加事件加入處理隊列
_updates.offer(update);
//檢查是否正在輪詢,若是正在輪詢,則會執行喚醒操做
//所以在此處須要將selecting置爲false
if (_selecting)
{
selector = _selector;
// To avoid the extra select wakeup.
_selecting = false;
}
}
if (selector != null)
{
//執行喚醒操做,以便對selector執行更新操做
if (LOG.isDebugEnabled())
LOG.debug("Wakeup on submit {}", this);
selector.wakeup();
}
}
複製代碼
SelectorProducer
是ManagedSelector
的內部類,該類實現了輪詢線程執行策略的ExecutionStrategy.Producer
接口
interface Producer {
// 返回一個Runnable任務供輪詢線程執行
Runnable produce();
}
複製代碼
所以SelectorProducer
須要不斷調用selector
去輪詢看有無新的I/O事件以供處理,除此以外它還須要處理外部類向ManagedSelector
經過調用submit
方法提交的SelectorUpdate
任務
其向線程執行策略類所提供produce
方法代以下所示,總的來講主要完成如下幾項工做
processUpdates
)@Override
public Runnable produce() {
while (true)
{
//處理以前查詢到事件
Runnable task = processSelected();
if (task != null)
return task;
//處理外部類所提交的update任務
//該方法最終會致使提交的SelectorUpdate.update被調用
processUpdates();
//此方法的調用可能會
//致使客戶端SocketChannel感興趣的事件發生變動
updateKeys();
//執行select操做,並將查詢到事件保存起來
if (!select())
return null;
}
}
複製代碼
processUpdates 此方法主要是處理外部類提交的SelectorUpdate
任務,經過複製引用很是巧妙的避免了併發問題
private void processUpdates() {
synchronized (ManagedSelector.this)
{
//倒騰數據,將要處理隊列的引用保存
//到另外一個變量上,原有的引用能夠繼續對外提供服務
//整個數據倒騰過程很是短,性能影響較小
Deque<SelectorUpdate> updates = _updates;
_updates = _updateable;
_updateable = updates;
}
if (LOG.isDebugEnabled())
LOG.debug("updateable {}", _updateable.size());
//遍歷事件隊列,處理update方法
for (SelectorUpdate update : _updateable)
{
if (_selector == null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}", update);
//調用事件的update方法,並傳入selector
update.update(_selector);
}
catch (Throwable ex)
{
LOG.warn(ex);
}
}
_updateable.clear();
Selector selector;
int updates;
//再次檢查是否有新的事件被提交,若是有則執行喚醒操做
synchronized (ManagedSelector.this)
{
//外部類提交的任務會保存到updates中
updates = _updates.size();
_selecting = updates == 0;
selector = _selecting ? null : _selector;
}
if (LOG.isDebugEnabled())
LOG.debug("updates {}", updates);
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("wakeup on updates {}", this);
selector.wakeup();
}
}
複製代碼
select() 該方法主要執行輪詢操做,並將輪詢到事件保存起來以供下一次循環的時候返回,在這個方法中展示jetty如何處理空輪詢
事件(空輪詢
是指selector在執行select操做時,沒有查詢到任何事件卻返回了,這個BUG一般會形成CPU100%
的使用率,從而使系統崩潰)
private boolean select() {
try
{
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} waiting with {} keys", selector, selector.keys().size());
int selected = selector.select();
//沒查詢到事件, 空輪詢事件處理
if (selected == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken with none selected", selector);
//若是線程被中斷,而且標誌位被設置了不在運行則執行推出邏輯
if (Thread.interrupted() && !isRunning())
throw new ClosedSelectorException();
//開啓了此參數則當即執行一次select操做
if (FORCE_SELECT_NOW)
selected = selector.selectNow();
}
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select, {}/{}/{} selected", selector, selected, selector.selectedKeys().size(), selector.keys().size());
int updates;
synchronized (ManagedSelector.this)
{
// 完成了select操做則設置標誌位
_selecting = false;
updates = _updates.size();
}
_keys = selector.selectedKeys();
_cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys, {} updates", selector, _keys.size(), updates);
return true;
}
}
catch (Throwable x)
{
_selector = null;
if (isRunning())
LOG.warn(x);
else
{
LOG.warn(x.toString());
LOG.debug(x);
}
closeNoExceptions(_selector);
}
return false;
}
複製代碼
與Netty的空輪詢處理策略不一樣,Jetty的處理策略是再select一次並當即返回,但這樣彷佛並不能解決空輪詢的BUG問題詳情
EatWhatYouKill
是線程執行策略的一種,也是Jetty默認的指策略,其思想來源於若是獵人殺死一隻獵物,那麼獵人就應該吃掉它
(若是你吃過新鮮的蝦你就會對這種哲學
深有體會),換種說法就是輪詢線程若是查詢到一次I/O事件就應該直接處理它
(想起引子了嗎)
P.S. 關鍵代碼
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
之因此這樣作的緣由是由於切換線程是一件比較費時操做(相對來講),所以在這種策略下輪詢線程A若是獲取到一個事件會有如下策略
標誌爲非阻塞任務
,那麼線程A會當即執行
此任務若是任務阻塞類型未知或者被標記爲阻塞狀態
若是線程池中的線程都處於繁忙
狀態,則將其提交到線程池種等待執行
若是線程池種有空閒線程B,則嘗試將線程A負責輪詢功
能交給線程B,若是當即獲取到線程B
成功,則線程A會直接執行獲取到的任務, 任務執行完成後,線程A會嘗試奪回
交給線程B的輪詢任務,若是奪回失敗則變爲空閒線程等待分配任務。(想起引子了嗎?)
除此以外,線程A還會嘗試直接執行任務而且不會交出輪詢工做 (代碼太長,只摘出關鍵代碼)
case BLOCKING:
synchronized (this)
{
if (_pending)
{
//輪詢工做陷入了停滯,所以是IDLE狀態
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
//tryExecute 若是當即分配到了線程則返回true
//this的run方法也就是實現輪詢線程核心的方法
//所以此行代碼至關於將輪詢的工做轉移給了其餘線程
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
//因爲輪詢工做的轉移
//所以當前輪詢工做至關於陷入空閒狀態
//因此須要將此對象的狀態至爲IDLE
//(輪詢線程和當前線程使用同一個對象)
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}else
{
//前二者均不知足則將任務提交到線程池
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
}
break;
複製代碼
任務的執行策略
case EXECUTE_PRODUCE_CONSUME:
_epcMode.increment();
//直接在當前線程調用
runTask(task);
// 嘗試奪回輪詢任務
synchronized (this)
{
// 若是State還處於空閒狀態
// 說明
// 線程B還未開始執行輪詢任務,能夠直接奪回
// 若是線程B已經開始輪詢
// 則選擇離開
if (_state == State.IDLE)
{
// 返回true則繼續輪詢
return true;
}
}
//返回false則結束輪詢任務,變爲空閒線程
return false;
複製代碼
相較於循規蹈矩的Tomcat,Jetty的設計更爲激進,更富有冒險主義者的精神,從我的角度來講更喜歡Jetty的設計,但從業務的角度來講仍是選擇Tomcat較爲穩妥畢竟穩定是業務的基本需求,而且Tomcat的性能也不會太差。
以線程的類別來進行劃分的話, Jetty的NIO模型以下圖所示
Acceptor
線程負責接收來自客戶端的新鏈接,並將其封裝成一個事件提交給輪詢線程處理輪詢線程
輪詢線程處理負責輪詢I/O事件以外,還須要處理外部線程所提交的selector
更新任務,而且根據設定的執行策略,輪詢線程可能會在本線程直接執行I/O任務,並將輪詢任務移交給其餘空閒的線程,或者選擇一個空閒的線程來執行I/O操做I/O線程
主要負責處理I/O操做從線程類別的角度來看Jetty的NIO模型相對簡單,但其引入的輪詢線程執行策略使得線程之間身份能夠發生轉變, 得益於此Jetty能夠直接輪詢線程直接執行I/O任務減小了線程上下文切換所帶來的性能消耗,提高了性能。
切換線程是有成本的 Jetty經過直接在輪詢線程執行I/O任務來提高性能,來減小線程上下文的切換,除此以外,咱們還能夠實現協程的機制來減小線程上下文切換所帶來的成本(參考Go語言)
Acceptor線程應適量 若是將ServerSocket設置爲阻塞模式,那麼accept操做將致使線程陷入阻塞,從accept方法返回時將引發線程上下的切換,所以並非越多越好
咱們使用SpringBoot來Debug Jetty,所以咱們須要在pom.xml
中引入Jetty,因爲SpringBoot默認使用Tomcat所以咱們須要將其替換掉,依賴以下所示.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
複製代碼
使用的SpringBoot版本是2.2.0其所依賴的Jetty版本號是9.4.20
若是你要了解Connector是如何工做的請關注如下類 org.eclipse.jetty.server.ServerConnector
若是你想要了解Jetty NIO 如何輪詢以及處理事件,那麼請關注如下類 org.eclipse.jetty.io.ManagedSelector
並在其內部類 SelectorProducer
的produce
方法打上斷點,以下圖所示,你將瞭解到整個輪詢過程當中都發生了什麼
右鍵小紅點,選擇Thread,以免進入不了斷點的狀況,畢竟咱們調試的是多線程程序
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill