tornado常見的異步非堵塞寫法

非堵塞和異步有什麼區別?

 

非堵塞

在tornado的框架中非堵塞通常指得是網絡I/O層面的socket數據接收模式(select或者epoll),不論用哪一個模式,最終程序都會收到數據並處理數據(這個數據要麼被轉發、要麼被解析和處理)。html

非堵塞的弊端:   若是處理一個密集計算的請求須要花費10秒鐘(就是堵塞了10秒鐘),當兩個或多個請求同時到達時,只要第一個被接受處理沒結束,其餘所有請求都要等,而且挨個挨個等到被輪詢結束。這就是單線程事件還回機制(非堵塞機制), 對堵塞零容忍, 任何一個地方堵住了還回線程,其餘所有請求都被堵住。python

也就是說採用了非堵塞模式以後,最好不要用堵塞(常規解析數據的函數)的代碼塊來解析數據。mysql

 

異步

異步的做用是將堵塞代碼錯開來,不放在當前接受數據的線程中處理,git

要麼丟到rabbitmq/zeromq/activemq中交給另一個進程去處理,要麼用其餘線程或進程來處理。github

讓監聽數據的這個socket收到數據後直接拋給其餘程序來處理,而後立馬保持監聽狀態,這樣子程序的循環能力就很是強。web

再就是要提的一點,tornado自己的ioloop就採用epool/select/kqueue來完成非堵塞動做,我們使用tornado只要把異步的代碼寫好就能夠很好的發揮出tornado的優點了。sql

 

堵塞模式編程流程:

傳統的I/O(socket)堵塞編程模式流程:數據庫

while True:編程

    1. socket accept (等待)windows

    2. socket receive (接受數據)

    3. handle data (處理數據)

    4. socket send (返回結果)

 

非堵塞模式編程流程:

while True:

     1. events = epoll poll (主動拉取列表)

     2. for file_descriptor, event in events: (查找是否有新的請求)

     3. async handle data

            3.1 標註狀態(running)

            3.2 異步丟給其餘函數經過線程的方式執行.

            3.3 線程執行完畢後修改狀態爲(finish), 而且經過回掉的方式註冊進 ioloop中(ioloop.add_done_callback或者ioloop.add_future)

     4. socket send (返回結果)

 

Python環境準備

    1. python  >= 2.7  < 3.x

    2. pip install requests tornado futures

 

Server環境準備

centos 7 blockingServer.py 192.168.1.100 構建用於測試的堵塞環節
windows 8

blockingClient.py

nbAsync.py

nbFuture.py

nbCoroutine.py

nbGenTask.py

192.168.1.101 驗證經常使用異步非堵塞寫法
centos 7 siege、ab 192.168.1.102 併發環境

 

啓動Server

在192.168.1.100服務器上運行用於測試的堵塞服務器(其實是非堵塞模式,只不過是每一個鏈接都要等待5秒鐘).

# 目的是提供一個堵塞的環境用來證實tornado結合經常使用的異步寫法都是非堵塞高效模式.

python blockingServer.py   

# -.- coding:utf-8 -.-
import tornado.web
import tornado.gen
import tornado.ioloop
import tornado.options
import tornado.httpserver


class BlockingHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        # 若是這條命令沒看懂的話,請參考這個連接: http://www.tornadoweb.org/en/stable/faq.html
        yield tornado.gen.sleep(5)
        self.write('ok')


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockingHandler),
        ]
        super(Application, self).__init__(handlers)


if __name__ == "__main__":
    tornado.options.define("port", default=88, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

 

1. tornado + 非異步代碼(堵塞的代碼)

代碼:

# 文件名: blockingClient.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import requests


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)


class BlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        response = requests.get('http://192.168.1.100:88/blocking')     # blocked here.
        result = dict(response.headers)
        result.update({'content': response.content})
        self.write(result)


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

測試方法:

    1. 在192.168.1.102壓力測試服務器上運行以下併發測試命令.

# 發起10個併發,持續60秒鐘.
[root@localhost ~]# siege http://192.168.1.101/blocking -c10 -t60s

    2. 在 windows 8 (192.168.1.101)上用瀏覽器來訪問以下連接.

http://192.168.1.101/non_blocking

測試結果:

siege:

** SIEGE 4.0.2
** Preparing 10 concurrent users for battle.
The server is now under siege...
HTTP/1.1 200     5.07 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    10.15 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    15.23 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    20.31 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    25.38 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    30.47 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    35.55 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    40.63 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    45.71 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    45.53 secs:     212 bytes ==> GET  /blocking
HTTP/1.1 200    45.51 secs:     212 bytes ==> GET  /blocking

Lifting the server siege...
Transactions:		          11 hits
Availability:		      100.00 %
Elapsed time:		       59.65 secs
Data transferred:	        0.00 MB
Response time:		       29.05 secs
Transaction rate:	        0.18 trans/sec
Throughput:		        0.00 MB/sec
Concurrency:		        5.36
Successful transactions:          11
Failed transactions:	           0
Longest transaction:	       45.71
Shortest transaction:	        5.07

瀏覽器:

        non_block也是等待狀態,必需要等block執行完成後,纔會執行non_block.

結論:

    siege在60秒鐘內,只獲得了11個結果,證實堵塞很是嚴重, 而且瀏覽器也是出於一直等待的狀態.

    也就是說在tornado中若是寫堵塞代碼,只有單線程在運行的tornado,會死的很難看,剛接觸tornado的同窗甚至都不知道爲何會這樣,根本沒有像據說那樣tornado是一個極其高效的web框架。

    經過結果能夠看出,不採用異步的方式就沒法發揮出它的能力。

 

2. tornado.web.asynchronous

代碼:

# 文件名: nbAysnc.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
# import requests                       # 不用requests, 後面再討論用requests也能異步非堵塞.
import tornado.httpclient               # 採用tornado自帶的異步httpclient客戶端


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)


class BlockHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        client = tornado.httpclient.AsyncHTTPClient()
        client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response)

    def on_response(self, content):
        result = dict(content.headers)
        result.update({'content': content.body})
        self.write(result)
        self.finish()


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

# 這裏提供一個不採用任何裝飾器的寫法, 比較raw ioloop, 運行結果是一致的,效率也是一致的.

# 文件名: nbAsync_NoAsyncDecorator.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
# import requests                        # 不僅用requests
import tornado.httpclient               # 採用tornado自帶的異步httpclient客戶端


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)


class BlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):                          # def get上方移除了tornado.web.asynchonous裝飾器
        self._auto_finish = False
        client = tornado.httpclient.AsyncHTTPClient()
        future = client.fetch('http://192.168.1.100:88/blocking')                    # 在這裏添加callback也行
        tornado.ioloop.IOLoop.current().add_future(future, callback=self.on_response)

    def on_response(self, content):
        result = dict(content.headers)
        result.update({'content': content.body})
        self.write(result)
        self.finish()


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    # 經過define 能夠爲options增長變量.
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

測試方法:

        參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.

測試結果:

siege:

Lifting the server siege...
Transactions:		         100 hits
Availability:		      100.00 %
Elapsed time:		       59.13 secs
Data transferred:	        0.02 MB
Response time:		        5.61 secs
Transaction rate:	        1.69 trans/sec
Throughput:		        0.00 MB/sec
Concurrency:		        9.48
Successful transactions:         100
Failed transactions:	           0
Longest transaction:	       10.62
Shortest transaction:	        5.07

瀏覽器:

        訪問non_blocking頁面正常並且響應很快。

 

結論

        siege一直在持續併發請求的同時用瀏覽器來訪問non_blocking和blocking頁面都可以獲得響應,也就證實tornado已經開始發揮它的功效了。

        採用了異步非堵塞模式後,被命中只有110次,落差很大,心理很是不平衡。其實這並非問題,這裏面有多重限制因此纔會致使這個結果。

        1. AsyncHttpClient自己的限制(默認狀況下只容許同時發起10個客戶端).  詳情請參考tornado源碼的 simple_httpclient.py文件

        2. ioloop自己的限制(爲了保證線程的穩定性,默認只開啓了10個線程來支持併發). 詳情請參考tornado源碼的 netutil.py文件

        能夠經過設定參數來提升併發能力(將 tornado.httpclient.AsyncHTTPClient() 改成 tornado.httpclient.AsyncHTTPClient(max_clients=100)).

        max_clients由默認的10改成100後,測試結果的hits也隨之增長了十倍.

Lifting the server siege...
Transactions:		        1099 hits
Availability:		      100.00 %
Elapsed time:		       59.71 secs
Data transferred:	        0.22 MB
Response time:		        5.10 secs
Transaction rate:	       18.41 trans/sec
Throughput:		        0.00 MB/sec
Concurrency:		       93.81
Successful transactions:        1099
Failed transactions:	           0
Longest transaction:	        6.34
Shortest transaction:	        5.06

 

3. tornado.concurrent.futures

代碼:

# 文件名: nbFuture.py

# 備註: 在第二章節中的移除tornado.web.asynchonous裝飾器的寫法一樣適合futures. 詳情請參考源碼文件: nbFuture_NoAsyncDecorator.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
# import requests                        # 不僅用requests
import tornado.httpclient               # 採用tornado自帶的異步httpclient客戶端


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)


class BlockHandler(tornado.web.RequestHandler):

    @tornado.web.asynchronous
    def get(self, *args, **kwargs):
        client = tornado.httpclient.AsyncHTTPClient()
        future = tornado.concurrent.Future()
        fetch_future = client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response)
        fetch_future.add_done_callback(lambda x: future.set_result(x.result()))

    def on_response(self, content):
        result = dict(content.headers)
        result.update({'content': content.body})
        self.write(result)
        self.finish()


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    # 經過define 能夠爲options增長變量.
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

測試方法:

         參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.

測試結果:

        於<2. tornado.web.asynchronous >的測試結果基本一致.

結論

        future是官方特別推薦用來練習的一種編碼方式,由於這樣會比較深刻的瞭解tornado的運做原理。

        future的add_done_callback方法,是告訴ioloop當future的狀態變動爲完成的時候,就調用包裹在add_done_callback中的函數(或匿名函數).

        future還提供了一組produce方法和consumer方法, 用於管理future的狀態.

 

4. tornado.gen.Task

代碼:

# 文件名: nbGenTask.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
import tornado.gen                      # 導入tornado.gen模塊
# import requests                        # 不僅用requests
import tornado.httpclient               # 採用tornado自帶的異步httpclient客戶端


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)


class BlockHandler(tornado.web.RequestHandler):

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        client = tornado.httpclient.AsyncHTTPClient()
        content = yield tornado.gen.Task(client.fetch, ('http://192.168.1.100:88/blocking'))
        result = dict(content.headers)
        result.update({'content': content.body})
        self.write(result)
        self.finish()


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

測試方法:

         參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.

測試結果:

         於<2. tornado.web.asynchronous >的測試結果基本一致.

結論

        tornado.gen.Task須要配合tornado.gen.coroutine裝飾器來完成代碼的運行,由於Task利用了yield,它的隱藏方法run()利用了gen.send()方法,因此gen模塊必需要用coroutine裝飾器.

        利用coroutine的方式比較明顯的一個地方是,代碼不用再分開了, 這個是Python語言的一個特性,yield關鍵字能夠賦值給一個變量, 所以就不須要callback了.

        這樣有什麼好處? 本地變量和全局變量不用傳遞了,默認就是共享的,這個算不算很爽?

 

5. tornado.gen.coroutine + ThreadPool/ProcessPool

代碼:

# 文件名: nbFuture.py

# -.- coding:utf-8 -.-
# __author__ = 'zhengtong'
import tornado.ioloop
import tornado.web
import tornado.options
import tornado.httpserver
import tornado.concurrent
import tornado.gen                      
import requests
import tornado.concurrent                # 導入 tornado.concurrent 併發模塊


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ('/blocking', BlockHandler),
            ('/non_blocking', NonBlockHandler),
        ]
        super(Application, self).__init__(handlers)

        # 建議設定爲CPU核心數量 * 4或8或16也是能夠接受的, 取決於計算量,計算量越大設定的值應該越小.
        self.executor = tornado.concurrent.futures.ThreadPoolExecutor(16)


class BlockHandler(tornado.web.RequestHandler):

    @property
    def executor(self):
        return self.application.executor

    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        print dir(self)
        content = yield self.executor.submit(requests.get, ('http://192.168.1.100:88/blocking'))
        result = dict(content.headers)
        result.update({'content': content.content})
        self.write(result)


class NonBlockHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        self.write('non_blocking')


if __name__ == "__main__":
    # 經過define 能夠爲options增長變量.
    tornado.options.define("port", default=80, help="run on the given port", type=int)
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(tornado.options.options.port)
    tornado.ioloop.IOLoop.current().start()

測試方法:

         參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.

測試結果:

         於<2. tornado.web.asynchronous >的測試結果基本一致.

結論

        自從了coroutine、threadpool、processpool以後,tornado算是一個里程碑式的解放了對異步的要求, 緣由是tornado的異步庫只針對httpclient, 沒有針對mysql或者其餘數據庫的異步庫(本身寫一個異步庫難度過高,由於展轉十幾個源碼文件的重度調用以及每一個類中的狀態控制)。

        coroutine結合threadpool讓編寫異步代碼再也不拆成多個函數,變量可以共享,堵塞的代碼(例如 requests、mysql.connect、密集計算)能夠不影響ioloop,造成真正的閉合.

       

 

參考:

           https://github.com/tornadoweb/tornado/wiki

           http://scotdoyle.com/python-epoll-howto.html

           tornado源碼

相關文章
相關標籤/搜索