要想開發一個高性能的TCP服務器,熟悉所使用框架的線程模型很是重要。MINA、Netty、Twisted自己都是高性能的網絡框架,若是再搭配上高效率的代碼,才能實現一個高大上的服務器。可是若是不瞭解它們的線程模型,就很難寫出高性能的代碼。框架自己效率再高,程序寫的太差,那麼服務器總體的性能也不會過高。就像一個電腦,CPU再好,內存小硬盤慢散熱差,總體的性能也不會過高。react
玩過Android開發的同窗會知道,在Android應用中有一個很是重要線程:UI線程(即主線程)。UI線程是負責一個Android的界面顯示以及和用戶交互。Activity的一些方法,例如onCreate、onStop、onDestroy都是運行在UI線程中的。可是在編寫Activity代碼的時候有一點須要很是注意,就是絕對不能把阻塞的或者耗時的任務寫在這些方法中,若是寫在這些方法中,則會阻塞UI線程,致使用戶操做的界面反應遲鈍,體驗不好。因此在Android開發中,耗時或者阻塞的任務會另外開線程去作。git
一樣在MINA、Netty、Twisted中,也有一個很是重要的線程:IO線程。github
傳統的BIO實現的TCP服務器,特別對於TCP長鏈接,一般都要爲每一個鏈接開啓一個線程,線程也是操做系統的一種資源,因此很難實現高性能高併發。而異步IO實現的TCP服務器,因爲IO操做都是異步的,能夠用一個線程或者少許線程來處理大量鏈接的IO操做,因此只須要少許的IO線程就能夠實現高併發的服務器。數據庫
在網絡編程過程當中,一般有一些業務邏輯是比較耗時、阻塞的,例如數據庫操做,若是網絡很差,加上數據庫性能差,SQL不夠優化,數據量大,一條SQL可能會執行好久。因爲IO線程自己數量就很少,一般只有一個或幾個,而若是這種耗時阻塞的代碼在IO線程中運行的話,IO線程的其餘事情,例如網絡read和write,就沒法進行了,會影響IO性能以及整個服務器的性能。編程
因此,不管是使用MINA、Netty、Twisted,若是有耗時的任務,就絕對不能在IO線程中運行,而是要另外開啓線程來處理。安全
MINA:服務器
在MINA中,有三種很是重要的線程:Acceptor thread、Connector thread、I/O processor thread。網絡
下面是官方文檔的介紹:session
In MINA, there are three kinds of I/O worker threads in the NIO socket implementation.
Acceptor thread accepts incoming connections, and forwards the connection to the I/O processor thread for read and write operations.
Each SocketAcceptor creates one acceptor thread. You can't configure the number of the acceptor threads.
Connector thread attempts connections to a remote peer, and forwards the succeeded connection to the I/O processor thread for read and write operations.
Each SocketConnector creates one connector thread. You can't configure the number of the connector threads, either.
I/O processor thread performs the actual read and write operation until the connection is closed.
Each SocketAcceptor or SocketConnector creates its own I/O processor thread(s). You can configure the number of the I/O processor threads. The default maximum number of the I/O processor threads is the number of CPU cores + 1.併發
Acceptor thread:
這個線程用於TCP服務器接收新的鏈接,並將鏈接分配到I/O processor thread,由I/O processor thread來處理IO操做。每一個NioSocketAcceptor建立一個Acceptor thread,線程數量不可配置。
Connector thread:
用於處理TCP客戶端鏈接到服務器,並將鏈接分配到I/O processor thread,由I/O processor thread來處理IO操做。每一個NioSocketConnector建立一個Connector thread,線程數量不可配置。
I/O processor thread:
用於處理TCP鏈接的I/O操做,如read、write。I/O processor thread的線程數量可經過NioSocketAcceptor或NioSocketConnector構造方法來配置,默認是CPU核心數+1。
因爲本文主要介紹TCP服務器的線程模型,因此就沒有Connector thread什麼事了。下面說下Acceptor thread和I/O processor thread處理TCP鏈接的流程:
MINA的TCP服務器包含一個Acceptor thread和多個I/O processor thread,當有新的客戶端鏈接到服務器,首先會由Acceptor thread獲取到這個鏈接,同時將這個鏈接分配給多個I/O processor thread中的一個線程,當客戶端發送數據給服務器,對應的I/O processor thread負責讀取這個數據,並執行IoFilterChain中的IoFilter以及IoHandle。
因爲I/O processor thread自己數量有限,一般就那麼幾個,可是又要處理成千上萬個鏈接的IO操做,包括read、write、協議的編碼解碼、各類Filter以及IoHandle中的業務邏輯,特別是業務邏輯,好比IoHandle的messageReceived,若是有耗時、阻塞的任務,例如查詢數據庫,那麼就會阻塞I/O processor thread,致使沒法及時處理其餘IO事件,服務器性能降低。
針對這個問題,MINA中提供了一個ExecutorFilter,用於將須要執行很長時間的會阻塞I/O processor thread的業務邏輯放到另外的線程中,這樣就不會阻塞I/O processor thread,不會影響IO操做。ExecutorFilter中包含一個線程池,默認是OrderedThreadPoolExecutor,這個線程池保證同一個鏈接的多個事件按順序依次執行,另外還可使用UnorderedThreadPoolExecutor,它不會保證同一鏈接的事件的執行順序,而且可能會併發執行。兩者之間能夠根據須要來選擇。
public class TcpServer { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(4); // 配置I/O processor thread線程數量 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory())); acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); // 將TcpServerHandle中的業務邏輯拿到ExecutorFilter的線程池中執行 acceptor.setHandler(new TcpServerHandle()); acceptor.bind(new InetSocketAddress(8080)); } } class TcpServerHandle extends IoHandlerAdapter { @Override public void messageReceived(IoSession session, Object message) throws Exception { // 假設這裏有個變態的SQL要執行3秒 Thread.sleep(3000); } }
Netty:
Netty的TCP服務器啓動時,會建立兩個NioEventLoopGroup,一個boss,一個worker:
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
NioEventLoopGroup其實是一個線程組,能夠經過構造方法設置線程數量,默認爲CPU核心數*2。boss用於服務器接收新的TCP鏈接,boss線程接收到新的鏈接後將鏈接註冊到worker線程。worker線程用於處理IO操做,例如read、write。
Netty中的boss線程相似於MINA的Acceptor thread,work線程和MINA的I/O processor thread相似。不一樣的一點是MINA的Acceptor thread是單個線程,而Netty的boss是一個線程組。實際上Netty的ServerBootstrap能夠監聽多個端口號,若是隻監聽一個端口號,那麼只須要一個boss線程便可,推薦將bossGroup的線程數量設置成1。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
當有新的TCP客戶端鏈接到服務器,將由boss線程來接收鏈接,而後將鏈接註冊到worker線程,當客戶端發送數據到服務器,worker線程負責接收數據,並執行ChannelPipeline中的ChannelHandler。
和MINA的I/O processor thread 相似,Netty的worker線程自己數量很少,並且要實時處理IO事件,若是有耗時的業務邏輯阻塞住worker線程,例如在channelRead中執行一個耗時的數據庫查詢,會致使IO操做沒法進行,服務器總體性能就會降低。
在Netty 3中,存在一個ExecutionHandler,它是ChannelHandler的一個實現類,用於處理耗時的業務邏輯,相似於MINA的ExecutorFilter,可是在Netty 4中被刪除了。因此這裏再也不介紹ExecutionHandler。
Netty 4中可使用EventExecutorGroup來處理耗時的業務邏輯:
public class TcpServer { public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 服務器監聽一個端口號,boss線程數建議設置成1 EventLoopGroup workerGroup = new NioEventLoopGroup(4); // worker線程數設置成4 try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { // 建立一個16個線程的線程組來處理耗時的業務邏輯 private EventExecutorGroup group = new DefaultEventExecutorGroup(16); @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(80)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); // 將TcpServerHandler中的業務邏輯放到EventExecutorGroup線程組中執行 pipeline.addLast(group, new TcpServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } } class TcpServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException { // 假設這裏有個變態的SQL要執行3秒 Thread.sleep(3000); } }
Twisted:
Twisted的線程模型是最簡單粗暴的:單線程,即reactor線程。也就是,全部的IO操做、編碼解碼、業務邏輯等都是在一個線程中執行。實際上,即便是單線程,其性能也是很是高的,能夠同時處理大量的鏈接。在單線程的環境下編程,不須要考慮線程安全的問題。不過,單線程帶來一個問題,就是耗時的業務邏輯,若是運行在reactor線程中,那麼其餘事情,例如網絡IO,就要等到reactor線程空閒時才能繼續作,會影響到服務器的性能。
下面的代碼,經過reactor.callInThread將耗時的業務邏輯放到單獨的線程池中執行,而不在reactor線程中運行。這樣就不會影響到reactor線程的網絡IO了。能夠經過reactor.suggestThreadPoolSize設置這個線程池的線程數量。
# -*- coding:utf-8 –*- import time from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet import reactor # 耗時、阻塞的業務邏輯 def logic(data): print data time.sleep(3) # 假設這裏有個變態的SQL要執行3秒 class TcpServerHandle(Protocol): def dataReceived(self, data): reactor.callInThread(logic, data) # 在線程池中運行logic(data)耗時任務,不在reactor線程中運行 reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8 factory = Factory() factory.protocol = TcpServerHandle reactor.listenTCP(8080, factory) reactor.run()
因爲Twisted的reactor的單線程設計,它的不少代碼都不是線程安全的。因此在非reactor線程中執行的代碼須要注意線程安全問題。例如transport.write就不是線程安全的。不過在非reactor線程中能夠調用reactor.callFromThread方法,這個方法功能和callInThread相反,將一個函數從別的線程放到reactor線程中運行。不過仍是要注意,reactor.callFromThread調用的函數因爲運行在reactor線程中,若是運行耗時,一樣會阻塞reactor線程,影響IO。
# -*- coding:utf-8 –*- import time from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet import reactor # 非線程安全的代碼 def notThreadSafe(): print "notThreadSafe" # 耗時、阻塞的業務邏輯 def logic(data): print data time.sleep(3) # 假設這裏有個變態的SQL要執行3秒 reactor.callFromThread(notThreadSafe) # 在reactor線程中運行notThreadSafe() class TcpServerHandle(Protocol): def dataReceived(self, data): reactor.callInThread(logic, data) # 在線程池中運行logic(data)耗時任務,不在reactor線程中運行 reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8 factory = Factory() factory.protocol = TcpServerHandle reactor.listenTCP(8080, factory) reactor.run()
此外,twisted.internet.threads中提供了許多很方便的函數。例如threads.deferToThread用於將一個耗時任務放在線程池中執行,與reactor.callInThread不一樣的是,它的返回值是Deferred類型,能夠經過添加回調函數,處理耗時任務完成後的結果(返回值)。
# -*- coding:utf-8 –*- import time from twisted.internet.protocol import Protocol from twisted.internet.protocol import Factory from twisted.internet import reactor, threads # 耗時、阻塞的業務邏輯 def logic(data): print data time.sleep(3) # 假設這裏有個變態的SQL要執行3秒 return "success" # 回調函數 def logicSuccess(result): # result即爲logic函數的返回值,即"success" print result class TcpServerHandle(Protocol): def dataReceived(self, data): d = threads.deferToThread(logic, data) # 將耗時的業務邏輯logic(data)放到線程池中運行,deferToThread返回值類型是Deferred d.addCallback(logicSuccess) # 添加回調函數 reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8 factory = Factory() factory.protocol = TcpServerHandle reactor.listenTCP(8080, factory) reactor.run()
MINA、Netty、Twisted一塊兒學(一):實現簡單的TCP服務器
MINA、Netty、Twisted一塊兒學(二):TCP消息邊界問題及按行分割消息
MINA、Netty、Twisted一塊兒學(三):TCP消息固定大小的前綴(Header)
MINA、Netty、Twisted一塊兒學(四):定製本身的協議
MINA、Netty、Twisted一塊兒學(五):整合protobuf
MINA、Netty、Twisted一塊兒學(六):session
MINA、Netty、Twisted一塊兒學(七):發佈/訂閱(Publish/Subscribe)
MINA、Netty、Twisted一塊兒學(八):HTTP服務器
MINA、Netty、Twisted一塊兒學(九):異步IO和回調函數
MINA、Netty、Twisted一塊兒學(十):線程模型
MINA、Netty、Twisted一塊兒學(十一):SSL/TLS
MINA、Netty、Twisted一塊兒學(十二):HTTPS