Mina、Netty、Twisted一塊兒學(十):線程模型

要想開發一個高性能的TCP服務器,熟悉所使用框架的線程模型很是重要。MINA、Netty、Twisted自己都是高性能的網絡框架,若是再搭配上高效率的代碼,才能實現一個高大上的服務器。可是若是不瞭解它們的線程模型,就很難寫出高性能的代碼。框架自己效率再高,程序寫的太差,那麼服務器總體的性能也不會過高。就像一個電腦,CPU再好,內存小硬盤慢散熱差,總體的性能也不會過高。react

玩過Android開發的同窗會知道,在Android應用中有一個很是重要線程:UI線程(即主線程)。UI線程是負責一個Android的界面顯示以及和用戶交互。Activity的一些方法,例如onCreate、onStop、onDestroy都是運行在UI線程中的。可是在編寫Activity代碼的時候有一點須要很是注意,就是絕對不能把阻塞的或者耗時的任務寫在這些方法中,若是寫在這些方法中,則會阻塞UI線程,致使用戶操做的界面反應遲鈍,體驗不好。因此在Android開發中,耗時或者阻塞的任務會另外開線程去作。git

一樣在MINA、Netty、Twisted中,也有一個很是重要的線程:IO線程github

傳統的BIO實現的TCP服務器,特別對於TCP長鏈接,一般都要爲每一個鏈接開啓一個線程,線程也是操做系統的一種資源,因此很難實現高性能高併發。而異步IO實現的TCP服務器,因爲IO操做都是異步的,能夠用一個線程或者少許線程來處理大量鏈接的IO操做,因此只須要少許的IO線程就能夠實現高併發的服務器。數據庫

在網絡編程過程當中,一般有一些業務邏輯是比較耗時、阻塞的,例如數據庫操做,若是網絡很差,加上數據庫性能差,SQL不夠優化,數據量大,一條SQL可能會執行好久。因爲IO線程自己數量就很少,一般只有一個或幾個,而若是這種耗時阻塞的代碼在IO線程中運行的話,IO線程的其餘事情,例如網絡read和write,就沒法進行了,會影響IO性能以及整個服務器的性能。編程

因此,不管是使用MINA、Netty、Twisted,若是有耗時的任務,就絕對不能在IO線程中運行,而是要另外開啓線程來處理。安全

MINA:服務器

在MINA中,有三種很是重要的線程:Acceptor thread、Connector thread、I/O processor thread。網絡

下面是官方文檔的介紹:session

In MINA, there are three kinds of I/O worker threads in the NIO socket implementation.
Acceptor thread accepts incoming connections, and forwards the connection to the I/O processor thread for read and write operations.
Each SocketAcceptor creates one acceptor thread. You can't configure the number of the acceptor threads.
Connector thread attempts connections to a remote peer, and forwards the succeeded connection to the I/O processor thread for read and write operations.
Each SocketConnector creates one connector thread. You can't configure the number of the connector threads, either.
I/O processor thread performs the actual read and write operation until the connection is closed.
Each SocketAcceptor or SocketConnector creates its own I/O processor thread(s). You can configure the number of the I/O processor threads. The default maximum number of the I/O processor threads is the number of CPU cores + 1.
併發

Acceptor thread:

這個線程用於TCP服務器接收新的鏈接,並將鏈接分配到I/O processor thread,由I/O processor thread來處理IO操做。每一個NioSocketAcceptor建立一個Acceptor thread,線程數量不可配置。

Connector thread:

用於處理TCP客戶端鏈接到服務器,並將鏈接分配到I/O processor thread,由I/O processor thread來處理IO操做。每一個NioSocketConnector建立一個Connector thread,線程數量不可配置。

I/O processor thread:

用於處理TCP鏈接的I/O操做,如read、write。I/O processor thread的線程數量可經過NioSocketAcceptor或NioSocketConnector構造方法來配置,默認是CPU核心數+1。

因爲本文主要介紹TCP服務器的線程模型,因此就沒有Connector thread什麼事了。下面說下Acceptor thread和I/O processor thread處理TCP鏈接的流程:

MINA的TCP服務器包含一個Acceptor thread和多個I/O processor thread,當有新的客戶端鏈接到服務器,首先會由Acceptor thread獲取到這個鏈接,同時將這個鏈接分配給多個I/O processor thread中的一個線程,當客戶端發送數據給服務器,對應的I/O processor thread負責讀取這個數據,並執行IoFilterChain中的IoFilter以及IoHandle。

因爲I/O processor thread自己數量有限,一般就那麼幾個,可是又要處理成千上萬個鏈接的IO操做,包括read、write、協議的編碼解碼、各類Filter以及IoHandle中的業務邏輯,特別是業務邏輯,好比IoHandle的messageReceived,若是有耗時、阻塞的任務,例如查詢數據庫,那麼就會阻塞I/O processor thread,致使沒法及時處理其餘IO事件,服務器性能降低。

針對這個問題,MINA中提供了一個ExecutorFilter,用於將須要執行很長時間的會阻塞I/O processor thread的業務邏輯放到另外的線程中,這樣就不會阻塞I/O processor thread,不會影響IO操做。ExecutorFilter中包含一個線程池,默認是OrderedThreadPoolExecutor,這個線程池保證同一個鏈接的多個事件按順序依次執行,另外還可使用UnorderedThreadPoolExecutor,它不會保證同一鏈接的事件的執行順序,而且可能會併發執行。兩者之間能夠根據須要來選擇。

public class TcpServer {

    public static void main(String[] args) throws IOException {
        IoAcceptor acceptor = new NioSocketAcceptor(4); // 配置I/O processor thread線程數量
        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
        acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); // 將TcpServerHandle中的業務邏輯拿到ExecutorFilter的線程池中執行
        acceptor.setHandler(new TcpServerHandle());
        acceptor.bind(new InetSocketAddress(8080));
    }

}

class TcpServerHandle extends IoHandlerAdapter {

    @Override
    public void messageReceived(IoSession session, Object message)
            throws Exception {
        
        // 假設這裏有個變態的SQL要執行3秒
        Thread.sleep(3000);
    }
}

Netty:

Netty的TCP服務器啓動時,會建立兩個NioEventLoopGroup,一個boss,一個worker

EventLoopGroup bossGroup = new NioEventLoopGroup();  
EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup其實是一個線程組,能夠經過構造方法設置線程數量,默認爲CPU核心數*2。boss用於服務器接收新的TCP鏈接,boss線程接收到新的鏈接後將鏈接註冊到worker線程。worker線程用於處理IO操做,例如read、write。

Netty中的boss線程相似於MINA的Acceptor thread,work線程和MINA的I/O processor thread相似。不一樣的一點是MINA的Acceptor thread是單個線程,而Netty的boss是一個線程組。實際上Netty的ServerBootstrap能夠監聽多個端口號,若是隻監聽一個端口號,那麼只須要一個boss線程便可,推薦將bossGroup的線程數量設置成1。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

當有新的TCP客戶端鏈接到服務器,將由boss線程來接收鏈接,而後將鏈接註冊到worker線程,當客戶端發送數據到服務器,worker線程負責接收數據,並執行ChannelPipeline中的ChannelHandler。

和MINA的I/O processor thread 相似,Netty的worker線程自己數量很少,並且要實時處理IO事件,若是有耗時的業務邏輯阻塞住worker線程,例如在channelRead中執行一個耗時的數據庫查詢,會致使IO操做沒法進行,服務器總體性能就會降低。

在Netty 3中,存在一個ExecutionHandler,它是ChannelHandler的一個實現類,用於處理耗時的業務邏輯,相似於MINA的ExecutorFilter,可是在Netty 4中被刪除了。因此這裏再也不介紹ExecutionHandler。

Netty 4中可使用EventExecutorGroup來處理耗時的業務邏輯:

public class TcpServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 服務器監聽一個端口號,boss線程數建議設置成1
        EventLoopGroup workerGroup = new NioEventLoopGroup(4); // worker線程數設置成4
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        
                        // 建立一個16個線程的線程組來處理耗時的業務邏輯
                        private EventExecutorGroup group = new DefaultEventExecutorGroup(16);
                        
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new LineBasedFrameDecoder(80));
                            pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
                            
                            // 將TcpServerHandler中的業務邏輯放到EventExecutorGroup線程組中執行
                            pipeline.addLast(group, new TcpServerHandler());
                        }
                    });
            ChannelFuture f = b.bind(8080).sync();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
        
        // 假設這裏有個變態的SQL要執行3秒
        Thread.sleep(3000);

    }
}

Twisted:

Twisted的線程模型是最簡單粗暴的:單線程,即reactor線程。也就是,全部的IO操做、編碼解碼、業務邏輯等都是在一個線程中執行。實際上,即便是單線程,其性能也是很是高的,能夠同時處理大量的鏈接。在單線程的環境下編程,不須要考慮線程安全的問題。不過,單線程帶來一個問題,就是耗時的業務邏輯,若是運行在reactor線程中,那麼其餘事情,例如網絡IO,就要等到reactor線程空閒時才能繼續作,會影響到服務器的性能。

下面的代碼,經過reactor.callInThread將耗時的業務邏輯放到單獨的線程池中執行,而不在reactor線程中運行。這樣就不會影響到reactor線程的網絡IO了。能夠經過reactor.suggestThreadPoolSize設置這個線程池的線程數量。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裏有個變態的SQL要執行3秒    

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        reactor.callInThread(logic, data) # 在線程池中運行logic(data)耗時任務,不在reactor線程中運行

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

因爲Twisted的reactor的單線程設計,它的不少代碼都不是線程安全的。因此在非reactor線程中執行的代碼須要注意線程安全問題。例如transport.write就不是線程安全的。不過在非reactor線程中能夠調用reactor.callFromThread方法,這個方法功能和callInThread相反,將一個函數從別的線程放到reactor線程中運行。不過仍是要注意,reactor.callFromThread調用的函數因爲運行在reactor線程中,若是運行耗時,一樣會阻塞reactor線程,影響IO。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 非線程安全的代碼
def notThreadSafe():
    print "notThreadSafe"

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裏有個變態的SQL要執行3秒
    reactor.callFromThread(notThreadSafe) # 在reactor線程中運行notThreadSafe()
    

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        reactor.callInThread(logic, data) # 在線程池中運行logic(data)耗時任務,不在reactor線程中運行

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

此外,twisted.internet.threads中提供了許多很方便的函數。例如threads.deferToThread用於將一個耗時任務放在線程池中執行,與reactor.callInThread不一樣的是,它的返回值是Deferred類型,能夠經過添加回調函數,處理耗時任務完成後的結果(返回值)。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor, threads

# 耗時、阻塞的業務邏輯
def logic(data):
    print data
    time.sleep(3) # 假設這裏有個變態的SQL要執行3秒
    return "success"

# 回調函數
def logicSuccess(result):
    # result即爲logic函數的返回值,即"success"
    print result

class TcpServerHandle(Protocol):
    
    def dataReceived(self, data):
        d = threads.deferToThread(logic, data) # 將耗時的業務邏輯logic(data)放到線程池中運行,deferToThread返回值類型是Deferred
        d.addCallback(logicSuccess) # 添加回調函數

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數量爲8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

 

MINA、Netty、Twisted一塊兒學系列

MINA、Netty、Twisted一塊兒學(一):實現簡單的TCP服務器

MINA、Netty、Twisted一塊兒學(二):TCP消息邊界問題及按行分割消息

MINA、Netty、Twisted一塊兒學(三):TCP消息固定大小的前綴(Header)

MINA、Netty、Twisted一塊兒學(四):定製本身的協議

MINA、Netty、Twisted一塊兒學(五):整合protobuf

MINA、Netty、Twisted一塊兒學(六):session

MINA、Netty、Twisted一塊兒學(七):發佈/訂閱(Publish/Subscribe)

MINA、Netty、Twisted一塊兒學(八):HTTP服務器

MINA、Netty、Twisted一塊兒學(九):異步IO和回調函數

MINA、Netty、Twisted一塊兒學(十):線程模型

MINA、Netty、Twisted一塊兒學(十一):SSL/TLS

MINA、Netty、Twisted一塊兒學(十二):HTTPS

源碼

https://github.com/wucao/mina-netty-twisted

相關文章
相關標籤/搜索