也是由於以前本身的不謹慎,在寫Java編程方法論-Reactor與Webflux
的時候,因以爲tomcat關於connector部分已經有不錯的博文了,草草參考了下,並無對源碼進行深刻分析,致使本身在錄製分享視頻的時候,發現本身文章內容展示的和源碼並不一致,又經過搜索引擎搜索了一些中文博客的文章,並不盡如人意,索性,本身的就經過最新的源碼來從新梳理一下關於tomcat connector部份內容,也是給本身一個警醒,凡事務必仔細仔細再仔細! 參考源碼地址: github.com/apache/tomc…java
關於Java編程方法論-Reactor與Webflux
的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:git
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…github
Reactor源碼解讀與分享:www.bilibili.com/video/av353…web
在Linux系統下,啓動和關閉Tomcat使用命令操做。shell
進入Tomcat下的bin目錄:apache
cd /java/tomcat/bin
複製代碼
啓動Tomcat命令:編程
./startup.sh
複製代碼
中止Tomcat服務命令:api
./shutdown.sh
複製代碼
執行tomcat 的./shutdown.sh
後,雖然tomcat服務不能正常訪問了,可是ps -ef | grep tomcat
後,發現tomcat
對應的java
進程未隨web容器關閉而銷燬,進而存在殭屍java
進程。網上看了下致使殭屍進程的緣由多是有非守護線程(即User Thread)存在,jvm不會退出(當JVM中全部的線程都是守護線程的時候,JVM就能夠退出了;若是還有一個或以上的非守護線程則JVM不會退出)。經過一下命令查看Tomcat進程是否結束:數組
ps -ef|grep tomcat
複製代碼
若是存在用戶線程,給kill掉就行了即便用kill -9 pid
緩存
咱們接着從startup.sh
這個shell腳本中能夠發現,其最終調用了catalina.sh start
,因而,咱們找到catalina.sh
裏,在elif [ "$1" = "start" ] ;
處,咱們往下走,能夠發現,其調用了org.apache.catalina.startup.Bootstrap.java
這個類下的start()
方法:
/** * org.apache.catalina.startup.Bootstrap * Start the Catalina daemon. * @throws Exception Fatal start error */
public void start() throws Exception {
if( catalinaDaemon==null ) init();
Method method = catalinaDaemon.getClass().getMethod("start", (Class [] )null);
method.invoke(catalinaDaemon, (Object [])null);
}
複製代碼
這裏,在服務器第一次啓動的時候,會調用其init()
,其主要用於建立org.apache.catalina.startup.Catalina.java
的類實例:
/** * org.apache.catalina.startup.Bootstrap * Initialize daemon. * @throws Exception Fatal initialization error */
public void init() throws Exception {
initClassLoaders();
Thread.currentThread().setContextClassLoader(catalinaLoader);
SecurityClassLoad.securityClassLoad(catalinaLoader);
// Load our startup class and call its process() method
if (log.isDebugEnabled())
log.debug("Loading startup class");
Class<?> startupClass = catalinaLoader.loadClass("org.apache.catalina.startup.Catalina");
Object startupInstance = startupClass.getConstructor().newInstance();
// Set the shared extensions class loader
if (log.isDebugEnabled())
log.debug("Setting startup class properties");
String methodName = "setParentClassLoader";
Class<?> paramTypes[] = new Class[1];
paramTypes[0] = Class.forName("java.lang.ClassLoader");
Object paramValues[] = new Object[1];
paramValues[0] = sharedLoader;
Method method =
startupInstance.getClass().getMethod(methodName, paramTypes);
method.invoke(startupInstance, paramValues);
catalinaDaemon = startupInstance;
}
複製代碼
接着,在Bootstrap的start()方法中會調用Catalina實例的start方法:
/** * org.apache.catalina.startup.Catalina * Start a new server instance. */
public void start() {
if (getServer() == null) {
load();
}
if (getServer() == null) {
log.fatal(sm.getString("catalina.noServer"));
return;
}
long t1 = System.nanoTime();
// Start the new server
try {
getServer().start();
} catch (LifecycleException e) {
log.fatal(sm.getString("catalina.serverStartFail"), e);
try {
getServer().destroy();
} catch (LifecycleException e1) {
log.debug("destroy() failed for failed Server ", e1);
}
return;
}
long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info(sm.getString("catalina.startup", Long.valueOf((t2 - t1) / 1000000)));
}
// Register shutdown hook
if (useShutdownHook) {
if (shutdownHook == null) {
shutdownHook = new CatalinaShutdownHook();
}
Runtime.getRuntime().addShutdownHook(shutdownHook);
// If JULI is being used, disable JULI's shutdown hook since
// shutdown hooks run in parallel and log messages may be lost
// if JULI's hook completes before the CatalinaShutdownHook()
LogManager logManager = LogManager.getLogManager();
if (logManager instanceof ClassLoaderLogManager) {
((ClassLoaderLogManager) logManager).setUseShutdownHook(
false);
}
}
if (await) {
await();
stop();
}
}
複製代碼
在這裏面,咱們主要關心load()
,getServer().start()
,對於後者,在它的先後咱們看到有啓動時間的計算,這也是平時咱們在啓動tomcat過程當中所看到的日誌打印輸出所在,後面的我這裏就不提了。
首先咱們來看load(),這裏,其會經過createStartDigester()
建立並配置咱們將用來啓動的Digester,而後獲取咱們所配置的ServerXml文件,依次對裏面屬性進行配置,最後調用getServer().init()
:
/** * org.apache.catalina.startup.Catalina * Start a new server instance. */
public void load() {
if (loaded) {
return;
}
loaded = true;
long t1 = System.nanoTime();
initDirs();
// Before digester - it may be needed
initNaming();
// Set configuration source
ConfigFileLoader.setSource(new CatalinaBaseConfigurationSource(Bootstrap.getCatalinaBaseFile(), getConfigFile()));
File file = configFile();
// Create and execute our Digester
Digester digester = createStartDigester();
try (ConfigurationSource.Resource resource = ConfigFileLoader.getSource().getServerXml()) {
InputStream inputStream = resource.getInputStream();
InputSource inputSource = new InputSource(resource.getURI().toURL().toString());
inputSource.setByteStream(inputStream);
digester.push(this);
digester.parse(inputSource);
} catch (Exception e) {
if (file == null) {
log.warn(sm.getString("catalina.configFail", getConfigFile() + "] or [server-embed.xml"), e);
} else {
log.warn(sm.getString("catalina.configFail", file.getAbsolutePath()), e);
if (file.exists() && !file.canRead()) {
log.warn(sm.getString("catalina.incorrectPermissions"));
}
}
return;
}
getServer().setCatalina(this);
getServer().setCatalinaHome(Bootstrap.getCatalinaHomeFile());
getServer().setCatalinaBase(Bootstrap.getCatalinaBaseFile());
// Stream redirection
initStreams();
// Start the new server
try {
getServer().init();
} catch (LifecycleException e) {
if (Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")) {
throw new java.lang.Error(e);
} else {
log.error(sm.getString("catalina.initError"), e);
}
}
long t2 = System.nanoTime();
if(log.isInfoEnabled()) {
log.info(sm.getString("catalina.init", Long.valueOf((t2 - t1) / 1000000)));
}
}
複製代碼
這裏,這個server從哪裏來,咱們從digester.addObjectCreate("Server", "org.apache.catalina.core.StandardServer", "className");
中能夠知道,其使用了這個類的實例,咱們再回到digester.push(this); digester.parse(inputSource);
這兩句代碼上來,可知,未開始解析時先調用Digester.push(this),此時棧頂元素是Catalina,這個用來爲catalina設置server,這裏,要對digester
的解析來涉及下:
如解析到<Server>
時就會建立StandardServer
類的實例並反射調用Digester
的stack
棧頂對象的setter
方法(調用的方法經過傳入的name
值肯定)。 digester
中涉及的IntrospectionUtils.setProperty(top, name, value)
方法,即top
爲棧頂對象,name
爲這個棧頂對象要設置的屬性名,value
爲要設置的屬性值。 剛開始時棧頂元素是Catalina
,即調用Catalina.setServer(Server object)
方法設置Server
爲後面調用Server.start()
作準備,而後將StandardServer
對象實例放入Digester
的stack
對象棧中。
接下來,咱們來看getServer().init()
,由上知,咱們去找org.apache.catalina.core.StandardServer.java
這個類,其繼承LifecycleMBeanBase
並實現了Server
,經過LifecycleMBeanBase
此類,說明這個StandardServer
管理的生命週期,即經過LifecycleMBeanBase
父類LifecycleBase
實現的init()
方法:
//org.apache.catalina.util.LifecycleBase.java
@Override
public final synchronized void init() throws LifecycleException {
if (!state.equals(LifecycleState.NEW)) {
invalidTransition(Lifecycle.BEFORE_INIT_EVENT);
}
try {
setStateInternal(LifecycleState.INITIALIZING, null, false);
initInternal();
setStateInternal(LifecycleState.INITIALIZED, null, false);
} catch (Throwable t) {
handleSubClassException(t, "lifecycleBase.initFail", toString());
}
}
複製代碼
因而,咱們關注 initInternal()
在StandardServer
中的實現,代碼過多,這裏就把過程講下: 一、調用父類org.apache.catalina.util.LifecycleMBeanBase#initInternal方法,註冊MBean
二、註冊本類的其它屬性的MBean
三、NamingResources初始化 : globalNamingResources.init();
四、從common ClassLoader開始往上查看,直到SystemClassLoader,遍歷各個classLoader對應的查看路徑,找到jar結尾的文件,讀取Manifest信息,加入到ExtensionValidator#containerManifestResources屬性中。
五、初始化service,默認實現是StandardService。
i) 調用super.initInternal()方法
ii) container初始化,這裏container實例是StandardEngine。
iii) Executor初始化
iv)Connector初始化:
a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
這裏,咱們能夠看到StandardServer
的父類org.apache.catalina.util.LifecycleBase.java
的實現:
@Override
public final synchronized void start() throws LifecycleException {
if (LifecycleState.STARTING_PREP.equals(state) || LifecycleState.STARTING.equals(state) ||
LifecycleState.STARTED.equals(state)) {
if (log.isDebugEnabled()) {
Exception e = new LifecycleException();
log.debug(sm.getString("lifecycleBase.alreadyStarted", toString()), e);
} else if (log.isInfoEnabled()) {
log.info(sm.getString("lifecycleBase.alreadyStarted", toString()));
}
return;
}
if (state.equals(LifecycleState.NEW)) {
init();
} else if (state.equals(LifecycleState.FAILED)) {
stop();
} else if (!state.equals(LifecycleState.INITIALIZED) &&
!state.equals(LifecycleState.STOPPED)) {
invalidTransition(Lifecycle.BEFORE_START_EVENT);
}
try {
setStateInternal(LifecycleState.STARTING_PREP, null, false);
startInternal();
if (state.equals(LifecycleState.FAILED)) {
// This is a 'controlled' failure. The component put itself into the
// FAILED state so call stop() to complete the clean-up.
stop();
} else if (!state.equals(LifecycleState.STARTING)) {
// Shouldn't be necessary but acts as a check that sub-classes are
// doing what they are supposed to.
invalidTransition(Lifecycle.AFTER_START_EVENT);
} else {
setStateInternal(LifecycleState.STARTED, null, false);
}
} catch (Throwable t) {
// This is an 'uncontrolled' failure so put the component into the
// FAILED state and throw an exception.
handleSubClassException(t, "lifecycleBase.startFail", toString());
}
}
複製代碼
對於StandardServer
,咱們關注的是其對於startInternal();
的實現,源碼不貼了,具體過程以下: 一、觸發CONFIGURE_START_EVENT事件。
二、設置本對象狀態爲STARTING
三、NameingResource啓動:globalNamingResources.start(); 四、StandardService啓動。
i) 設置狀態爲STARTING
ii) container啓動,即StandardEngine啓動
iii) Executor 啓動
iv) Connector啓動:
a)org.apache.catalina.connector.Connector Connector[HTTP/1.1-8080]
b) org.apache.catalina.connector.Connector Connector[AJP/1.3-8009]
終於,咱們探究到了我要講的主角Connector
。
咱們由apache-tomcat-9.0.14\conf
目錄(此處請自行下載相應版本的tomcat)下的server.xml中的Connector
配置可知,其默認8080端口的配置協議爲HTTP/1.1
。
<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" />
<!-- Define an AJP 1.3 Connector on port 8009 -->
<Connector port="8009" protocol="AJP/1.3" redirectPort="8443" />
複製代碼
知道了這些,咱們去看它的代碼中的實現:
public Connector() {
this("org.apache.coyote.http11.Http11NioProtocol");
}
public Connector(String protocol) {
boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
AprLifecycleListener.getUseAprConnector();
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol";
} else {
protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol";
}
} else {
protocolHandlerClassName = protocol;
}
// Instantiate protocol handler
ProtocolHandler p = null;
try {
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.getConstructor().newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}
// Default for Connector depends on this system property
setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE"));
}
複製代碼
對於tomcat8.5以上,其默認就是Http11NioProtocol
協議,這裏,咱們給其設定了HTTP/1.1
,但根據上面的if語句的判斷,是相等的,也就是最後仍是選擇的Http11NioProtocol
。
一樣,由上一節可知,咱們會涉及到Connector初始化,也就是其也會繼承LifecycleMBeanBase
,那麼,咱們來看其相關initInternal()
實現:
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
if (protocolHandler == null) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
}
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
if (service != null) {
protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
}
// Make sure parseBodyMethodsSet has a default
if (null == parseBodyMethodsSet) {
setParseBodyMethods(getParseBodyMethods());
}
if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
getProtocolHandlerClassName()));
}
if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
protocolHandler instanceof AbstractHttp11JsseProtocol) {
AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
(AbstractHttp11JsseProtocol<?>) protocolHandler;
if (jsseProtocolHandler.isSSLEnabled() &&
jsseProtocolHandler.getSslImplementationName() == null) {
// OpenSSL is compatible with the JSSE configuration, so use it if APR is available
jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
}
}
try {
protocolHandler.init();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
}
}
複製代碼
這裏涉及的過程以下: 一、註冊MBean
二、CoyoteAdapter實例化,CoyoteAdapter是請求的入口。當有請求時,CoyoteAdapter對狀態進行了處理,結尾處對請求進行回收,中間過程交由pipeline來處理。
三、protocolHandler 初始化(org.apache.coyote.http11.Http11Protocol)
在這一步中,完成了endpoint的初始化
關於啓動就不說了,其設定本對象狀態爲STARTING,同時調用protocolHandler.start();
,接下來,就要進入咱們的核心節奏了。
@Override
protected void startInternal() throws LifecycleException {
// Validate settings before starting
if (getPortWithOffset() < 0) {
throw new LifecycleException(sm.getString(
"coyoteConnector.invalidPort", Integer.valueOf(getPortWithOffset())));
}
setState(LifecycleState.STARTING);
try {
protocolHandler.start();
} catch (Exception e) {
throw new LifecycleException(
sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
複製代碼
這裏,咱們直接從其抽象實現org.apache.coyote.AbstractProtocol.java
來看,其也是遵循生命週期的,因此其也要繼承LifecycleMBeanBase
並實現本身的init()
與start()
等生命週期方法,其內部都是由相應的自實現的endpoint
來執行具體邏輯:
//org.apache.coyote.AbstractProtocol.java
@Override
public void init() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
logPortOffset();
}
if (oname == null) {
// Component not pre-registered so register it
oname = createObjectName();
if (oname != null) {
Registry.getRegistry(null, null).registerComponent(this, oname, null);
}
}
if (this.domain != null) {
rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
Registry.getRegistry(null, null).registerComponent(
getHandler().getGlobal(), rgOname, null);
}
String endpointName = getName();
endpoint.setName(endpointName.substring(1, endpointName.length()-1));
endpoint.setDomain(domain);
endpoint.init();
}
@Override
public void start() throws Exception {
if (getLog().isInfoEnabled()) {
getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
logPortOffset();
}
endpoint.start();
monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
if (!isPaused()) {
startAsyncTimeout();
}
}
}, 0, 60, TimeUnit.SECONDS);
}
複製代碼
拿org.apache.coyote.http11.Http11NioProtocol
這個類來說,其接收的是NioEndpoint
來進行構造器的實現,其內部的方法的具體實現也經由此NioEndpoint
來實現其邏輯:
public class Http11NioProtocol extends AbstractHttp11JsseProtocol<NioChannel> {
private static final Log log = LogFactory.getLog(Http11NioProtocol.class);
public Http11NioProtocol() {
super(new NioEndpoint());
}
@Override
protected Log getLog() { return log; }
// -------------------- Pool setup --------------------
public void setPollerThreadCount(int count) {
((NioEndpoint)getEndpoint()).setPollerThreadCount(count);
}
public int getPollerThreadCount() {
return ((NioEndpoint)getEndpoint()).getPollerThreadCount();
}
public void setSelectorTimeout(long timeout) {
((NioEndpoint)getEndpoint()).setSelectorTimeout(timeout);
}
public long getSelectorTimeout() {
return ((NioEndpoint)getEndpoint()).getSelectorTimeout();
}
public void setPollerThreadPriority(int threadPriority) {
((NioEndpoint)getEndpoint()).setPollerThreadPriority(threadPriority);
}
public int getPollerThreadPriority() {
return ((NioEndpoint)getEndpoint()).getPollerThreadPriority();
}
// ----------------------------------------------------- JMX related methods
@Override
protected String getNamePrefix() {
if (isSSLEnabled()) {
return "https-" + getSslImplementationShortName()+ "-nio";
} else {
return "http-nio";
}
}
}
複製代碼
這裏,EndPoint
用於處理具體鏈接和傳輸數據,即用來實現網絡鏈接和控制,它是服務器對外I/O
操做的接入點。主要任務是管理對外的socket
鏈接,同時將創建好的socket
鏈接交到合適的工做線程中去。 裏面兩個主要的屬性類是Acceptor
和Poller
、SocketProcessor
。 咱們以NioEndpoint
爲例,其內部請求處理具體的流程以下:
結合上一節最後,咱們主要仍是關注其對於Protocol
有關生命週期方法的具體實現:
//org.apache.tomcat.util.net.AbstractEndpoint.java
public final void init() throws Exception {
if (bindOnInit) {
bindWithCleanup();
bindState = BindState.BOUND_ON_INIT;
}
if (this.domain != null) {
// Register endpoint (as ThreadPool - historical name)
oname = new ObjectName(domain + ":type=ThreadPool,name=\"" + getName() + "\"");
Registry.getRegistry(null, null).registerComponent(this, oname, null);
ObjectName socketPropertiesOname = new ObjectName(domain +
":type=ThreadPool,name=\"" + getName() + "\",subType=SocketProperties");
socketProperties.setObjectName(socketPropertiesOname);
Registry.getRegistry(null, null).registerComponent(socketProperties, socketPropertiesOname, null);
for (SSLHostConfig sslHostConfig : findSslHostConfigs()) {
registerJmx(sslHostConfig);
}
}
}
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bindWithCleanup();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
//org.apache.tomcat.util.net.AbstractEndpoint.java
private void bindWithCleanup() throws Exception {
try {
bind();
} catch (Throwable t) {
// Ensure open sockets etc. are cleaned up if something goes
// wrong during bind
ExceptionUtils.handleThrowable(t);
unbind();
throw t;
}
}
複製代碼
這兩個方法主要調用bind
(此處能夠查閱bindWithCleanup()
的具體實現) 和startlntemal
方法,它們是模板方法,能夠自行根據需求實現,這裏,咱們參考NioEndpoint
中的實現, bind
方法代碼以下:
//org.apache.tomcat.util.net.NioEndpoint.java
@Override
public void bind() throws Exception {
initServerSocket();
// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
setStopLatch(new CountDownLatch(pollerThreadCount));
// Initialize SSL if needed
initialiseSsl();
selectorPool.open();
}
複製代碼
這裏的bind 方法中首先初始化了ServerSocket
(這個東西咱們在jdk網絡編程裏都接觸過,就很少說了,這裏是封裝了一個工具類,看下面實現),而後檢查了表明Acceptor
和Poller
初始化的線程數量的acceptorThreadCount
屬性和pollerThreadCount
屬性,它們的值至少爲1。
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
複製代碼
這裏,Acceptor
用於接收請求,將接收到請求交給Poller
處理,它們都是啓動線程來處理的。另外還進行了初始化SSL
等內容。NioEndpoint
的startInternal
方法代碼以下:
/** * The socket pollers. */
private Poller[] pollers = null;
/** * Start the NIO endpoint, creating acceptor, poller threads. */
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// Create worker collection
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
// Start poller threads
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
startAcceptorThreads();
}
}
複製代碼
這裏首先初始化了一些屬性,初始化的屬性中的processorCache
是SynchronizedStack<SocketProcessor>
類型, SocketProcessor
是NioEndpoint
的一個內部類, Poller
接收到請求後就會交給它處理, SocketProcessor
又會將請求傳遞到Handler
。 而後啓動了Poller
和Acceptor
來處理請求,這裏咱們要注意的的是,pollers
是一個數組,其管理了一堆Runnable
,由前面可知,假如咱們並無對其進行設定,那就是1,也就是說,其默認狀況下只是一個單線程。這個線程建立出來後就將其設定爲守護線程,直到tomcat容器結束,其天然也會跟着結束。 這裏,咱們想要對其進行配置的話,能夠在server.xml
中進行相應設定:
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" maxHeaderCount="64" maxParameterCount="64" maxHttpHeaderSize="8192" URIEncoding="UTF-8" useBodyEncodingForURI="false" maxThreads="128" minSpareThreads="12" acceptCount="1024" connectionLinger="-1" keepAliveTimeout="60" maxKeepAliveRequests="32" maxConnections="10000" acceptorThreadCount="1" pollerThreadCount="2" selectorTimeout="1000" useSendfile="true" selectorPool.maxSelectors="128" redirectPort="8443" />
複製代碼
啓動Acceptor
的startAcceptorThreads
方法在 AbstractEndpoint
中,代碼以下:
protected void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
Acceptor<U> acceptor = new Acceptor<>(this);
String threadName = getName() + "-Acceptor-" + i;
acceptor.setThreadName(threadName);
acceptors.add(acceptor);
Thread t = new Thread(acceptor, threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
複製代碼
這裏的getAcceptorThreadCount
方法就是獲取的init 方法中處理過的acceptorThreadCount屬性,獲取到後就會啓動相應數量的Acceptor 線程來接收請求。默認一樣是1,其建立線程的方式和Poller一致,就很少說了。
這裏,咱們再來看下webapps/docs/config/http.xml的文檔說明:
<attribute name="acceptorThreadCount" required="false">
<p>The number of threads to be used to accept connections. Increase this
value on a multi CPU machine, although you would never really need more
than <code>2</code>. Also, with a lot of non keep alive connections, you
might want to increase this value as well. Default value is
<code>1</code>.</p>
</attribute>
<attribute name="pollerThreadCount" required="false">
<p>(int)The number of threads to be used to run for the polling events.
Default value is <code>1</code> per processor but not more than 2.<br/>
When accepting a socket, the operating system holds a global lock. So the benefit of
going above 2 threads diminishes rapidly. Having more than one thread is for
system that need to accept connections very rapidly. However usually just
increasing <code>acceptCount</code> will solve that problem.
Increasing this value may also be beneficial when a large amount of send file
operations are going on.
</p>
</attribute>
複製代碼
由此可知,acceptorThreadCount
用於設定接受鏈接的線程數。 在多CPU機器上增長這個值,雖然你可能真的不須要超過2個。哪怕有不少非keep alive鏈接,你也可能想要增長這個值。 其默認值爲1。 pollerThreadCount
用於爲輪詢事件運行的線程數。默認值爲每一個處理器1個但不要超過2個(上面的優化配置裏的設定爲2)。接受socket時,操做系統將保持全局鎖定。 所以,超過2個線程的好處迅速減小。 當系統擁有多個該類型線程,它能夠很是快速地接受鏈接。 因此增長acceptCount就能夠解決這個問題。當正在進行大量發送文件操做時,增長此值也多是有益的。
咱們先來看一張NioEndpoint處理的的時序圖:
咱們由前面可知,Acceptor和Poller都實現了Runnable接口,因此其主要工做流程就在其實現的run方法內,這裏咱們先來看Acceptor對於run方法的實現:
//org.apache.tomcat.util.net.NioEndpoint.java
@Override
protected SocketChannel serverSocketAccept() throws Exception {
return serverSock.accept();
}
//org.apache.tomcat.util.net.Acceptor.java
public class Acceptor<U> implements Runnable {
private static final Log log = LogFactory.getLog(Acceptor.class);
private static final StringManager sm = StringManager.getManager(Acceptor.class);
private static final int INITIAL_ERROR_DELAY = 50;
private static final int MAX_ERROR_DELAY = 1600;
private final AbstractEndpoint<?,U> endpoint;
private String threadName;
protected volatile AcceptorState state = AcceptorState.NEW;
public Acceptor(AbstractEndpoint<?,U> endpoint) {
this.endpoint = endpoint;
}
public final AcceptorState getState() {
return state;
}
final void setThreadName(final String threadName) {
this.threadName = threadName;
}
final String getThreadName() {
return threadName;
}
@Override
public void run() {
int errorDelay = 0;
// Loop until we receive a shutdown command
while (endpoint.isRunning()) {
// Loop if endpoint is paused
while (endpoint.isPaused() && endpoint.isRunning()) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}
if (!endpoint.isRunning()) {
break;
}
state = AcceptorState.RUNNING;
try {
//if we have reached max connections, wait
endpoint.countUpOrAwaitConnection();
// Endpoint might have been paused while waiting for latch
// If that is the case, don't accept new connections
if (endpoint.isPaused()) {
continue;
}
U socket = null;
try {
// Accept the next incoming connection from the server
// socket
// 建立一個socketChannel,接收下一個從服務器進來的鏈接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
// 若是EndPoint處於running狀態而且沒有沒暫停
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
String msg = sm.getString("endpoint.accept.fail");
// APR specific.
// Could push this down but not sure it is worth the trouble.
if (t instanceof Error) {
Error e = (Error) t;
if (e.getError() == 233) {
// Not an error on HP-UX so log as a warning
// so it can be filtered out on that platform
// See bug 50273
log.warn(msg, t);
} else {
log.error(msg, t);
}
} else {
log.error(msg, t);
}
}
}
state = AcceptorState.ENDED;
}
...
public enum AcceptorState {
NEW, RUNNING, PAUSED, ENDED
}
}
複製代碼
由上面run方法能夠看到,Acceptor
使用serverSock.accept()
阻塞的監聽端口,若是有鏈接進來,拿到了socket
,而且EndPoint
處於正常運行狀態,則調用NioEndPoint
的setSocketOptions
方法,對於setSocketOptions
,歸納來說就是根據socket
構建一個NioChannel
,而後把這個的NioChannel
註冊到Poller
的事件列表裏面,等待poller
輪詢:
/** * org.apache.tomcat.util.net.NioEndpoint.java * Process the specified connection. * 處理指定的鏈接 * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately * 若是socket配置正確,而且可能會繼續處理,返回true * 若是socket須要當即關閉,則返回false */
@Override
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
//從緩存中拿一個nioChannel 若沒有,則建立一個。將socket傳進去
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
//從pollers數組中獲取一個Poller對象,註冊這個nioChannel
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
/** * Return an available poller in true round robin fashion. * * @return The next poller in sequence */
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
複製代碼
關於getPoller0()
,默認狀況下, 由前面可知,這個pollers數組裏只有一個元素,這點要注意。咱們來看NioEndPoint中的Poller實現的register方法,主要作的就是在Poller註冊新建立的套接字。
/** * Registers a newly created socket with the poller. * * @param socket The newly created socket */
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getConnectionTimeout());
ka.setWriteTimeout(getConnectionTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
//從緩存中取出一個PollerEvent對象,若沒有則建立一個。將socket和NioSocketWrapper設置進去
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
//添到到該Poller的事件列表
addEvent(r);
}
複製代碼
對以上過程進行一下總結:
從Acceptor接收到請求,它作了以下工做:
在這裏,會調用NioEndPoint的setSocketOptions方法,處理指定的鏈接:
其中最後一步註冊的過程,是調用Poller的register()方法:
由前面可知,poller也實現了Runnable接口,並在start的這部分生命週期執行的過程當中建立對應工做線程並加入其中,因此,咱們來經過其run方法來看下其工做機制。
其實上面已經提到了Poller將一個事件註冊到事件隊列的過程。接下來Poller線程要作的事情其實就是如何處理這些事件。
Poller在run方法中會輪詢事件隊列events,將每一個PollerEvent中的SocketChannel的interestOps註冊到Selector中,而後將PollerEvent從隊列裏移除。以後就是SocketChanel經過Selector調度來進行非阻塞的讀寫數據了。
/** * Poller class. */
public class Poller implements Runnable {
private Selector selector;
private final SynchronizedQueue<PollerEvent> events =
new SynchronizedQueue<>();
private volatile boolean close = false;
private long nextExpiration = 0;//optimize expiration handling
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
public Poller() throws IOException {
this.selector = Selector.open();
}
public int getKeyCount() { return keyCount; }
public Selector getSelector() { return selector;}
/** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */
@Override
public void run() {
// Loop until destroy() is called
// 循環直到 destroy() 被調用
while (true) {
boolean hasEvents = false;
try {
if (!close) {
//遍歷events,將每一個事件中的Channel的interestOps註冊到Selector中
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
//若是走到了這裏,表明已經有就緒的IO Channel
//調用非阻塞的select方法,直接返回就緒Channel的數量
keyCount = selector.selectNow();
} else {
//阻塞等待操做系統返回 數據已經就緒的Channel,而後被喚醒
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
//either we timed out or we woke up, process events first
//若是上面select方法超時,或者被喚醒,先將events隊列中的Channel註冊到Selector上。
if ( keyCount == 0 ) hasEvents = (hasEvents | events());
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
// 遍歷已就緒的Channel,並調用processKey來處理該Socket的IO。
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
// 若是其它線程已調用,則Attachment可能爲空
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
//建立一個SocketProcessor,放入Tomcat線程池去執行
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
getStopLatch().countDown();
}
...
}
複製代碼
上面讀取已就緒Channel的部分,是十分常見的Java NIO的用法,即 Selector調用selectedKeys(),獲取IO數據已經就緒的Channel,遍歷並調用processKey方法來處理每個Channel就緒的事件。而processKey方法會建立一個SocketProcessor,而後丟到Tomcat線程池中去執行。
這裏還須要注意的一個點是,events()方法,用來處理PollerEvent事件,執行PollerEvent.run(),而後將PollerEvent重置再次放入緩存中,以便對象複用。
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */
public boolean events() {
boolean result = false;
PollerEvent pe = null;
for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
result = true;
try {
//把SocketChannel的interestOps註冊到Selector中
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error(sm.getString("endpoint.nio.pollerEventError"), x);
}
}
return result;
}
複製代碼
因此,PollerEvent.run()方法纔是咱們關注的重點:
/** * PollerEvent, cacheable object for poller events to avoid GC */
public static class PollerEvent implements Runnable {
private NioChannel socket;
private int interestOps;
private NioSocketWrapper socketWrapper;
public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
reset(ch, w, intOps);
}
public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
socket = ch;
interestOps = intOps;
socketWrapper = w;
}
public void reset() {
reset(null, null, 0);
}
@Override
public void run() {
//Acceptor調用Poller.register()方法時,建立的PollerEvent的interestOps爲OP_REGISTER,所以走這個分支
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
if (key == null) {
// The key was cancelled (e.g. due to socket closure)
// and removed from the selector while it was being
// processed. Count down the connections at this point
// since it won't have been counted down when the socket
// closed.
socket.socketWrapper.getEndpoint().countDownConnection();
((NioSocketWrapper) socket.socketWrapper).closed = true;
} else {
final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
if (socketWrapper != null) {
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
socketWrapper.interestOps(ops);
key.interestOps(ops);
} else {
socket.getPoller().cancelledKey(key);
}
}
} catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key);
} catch (Exception ignore) {}
}
}
}
@Override
public String toString() {
return "Poller event: socket [" + socket + "], socketWrapper [" + socketWrapper +
"], interestOps [" + interestOps + "]";
}
}
複製代碼
至此,能夠看出Poller線程的做用
剩下的事情,就是SocketProcessor怎麼適配客戶端發來請求的數據、而後怎樣交給Servlet容器去處理了。
即Poller的run方法中最後調用的processKey(sk, attachment);
:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
}
}
複製代碼
即從processSocket
這個方法中會用到SocketProcessor
來處理請求:
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
複製代碼
這裏簡單提一下SocketProcessor
的處理過程,幫助你們對接到Servlet容器處理上。經過上面能夠知道,具體處理一個請求,是在SocketProcessor經過線程池去執行的,這裏,咱們來看其執行一次請求的時序圖:
由圖中能夠看到,SocketProcessor
中經過Http11ConnectionHandler
,拿到Htpp11Processor
,而後Htpp11Processor
會調用prepareRequest
方法來準備好請求數據。接着調用CoyoteAdapter
的service
方法進行request
和response
的適配,以後交給Tomcat
容器進行處理。
下面經過一個系列調用來表示下過程:
connector.getService().getContainer().getPipeline().getFirst().invoke(request,response);
這裏首先從Connector 中獲取到Service ( Connector 在initInternal 方法中建立CoyoteAdapter的時候已經將本身設置到了CoyoteAdapter 中),而後從Service 中獲取Container ,接着獲取管道,再獲取管道的第一個Value,最後調用invoke 方法執行請求。Service 中保存的是最頂層的容器,當調用最頂層容器管道的invoke 方法時,管道將逐層調用各層容器的管道中Value 的invoke 方法,直到最後調用Wrapper 的管道中的BaseValue ( StandardWrapperValve)來處理Filter 和Servlet。
即將請求交給Tomcat容器處理後,而後將請求一層一層傳遞到Engine、Host、Context、Wrapper,最終通過一系列Filter,來到了Servlet,執行咱們本身具體的代碼邏輯。
至此關於Connector的一些東西就算涉及差很少了,剩下的假如之後有精力的話,繼續探究下,接着分享Webflux的解讀去。
補充: 感謝零度大佬(博客:www.jiangxinlingdu.com)的提問,這裏我將本身的一些額外的問題理解進行內容補充:
這裏對於其中NioEndpoint
中其有關生命週期部分的實現所涉及的initServerSocket()
再來關注下細節:
// Separated out to make it easier for folks that extend NioEndpoint to
// implement custom [server]sockets
protected void initServerSocket() throws Exception {
if (!getUseInheritedChannel()) {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
serverSock.socket().bind(addr,getAcceptCount());
} else {
// Retrieve the channel provided by the OS
Channel ic = System.inheritedChannel();
if (ic instanceof ServerSocketChannel) {
serverSock = (ServerSocketChannel) ic;
}
if (serverSock == null) {
throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
}
}
serverSock.configureBlocking(true); //mimic APR behavior
}
複製代碼
其最後一句,爲何tomcat這個不設置非阻塞?這會兒是剛初始化的時候,設定爲阻塞狀態,阻塞也只是阻塞在這個線程上,即Acceptor
在一條線程內執行其run方法的時候,會調用endpoint.serverSocketAccept()
來建立一個socketChannel
,接收下一個從服務器進來的鏈接。當成功接收到,從新對此socket
進行配置,即會調用endpoint.setSocketOptions(socket)
,在這個方法內,會調用 socket.configureBlocking(false);
,此時,會開啓SocketChannel
在非阻塞模式,具體代碼請回顧本文前面細節。