在tornado的框架中非堵塞通常指得是網絡I/O層面的socket數據接收模式(select或者epoll),不論用哪一個模式,最終程序都會收到數據並處理數據(這個數據要麼被轉發、要麼被解析和處理)。html
非堵塞的弊端: 若是處理一個密集計算的請求須要花費10秒鐘(就是堵塞了10秒鐘),當兩個或多個請求同時到達時,只要第一個被接受處理沒結束,其餘所有請求都要等,而且挨個挨個等到被輪詢結束。這就是單線程事件還回機制(非堵塞機制), 對堵塞零容忍, 任何一個地方堵住了還回線程,其餘所有請求都被堵住。python
也就是說採用了非堵塞模式以後,最好不要用堵塞(常規解析數據的函數)的代碼塊來解析數據。mysql
異步的做用是將堵塞代碼錯開來,不放在當前接受數據的線程中處理,git
要麼丟到rabbitmq/zeromq/activemq中交給另一個進程去處理,要麼用其餘線程或進程來處理。github
讓監聽數據的這個socket收到數據後直接拋給其餘程序來處理,而後立馬保持監聽狀態,這樣子程序的循環能力就很是強。web
再就是要提的一點,tornado自己的ioloop就採用epool/select/kqueue來完成非堵塞動做,我們使用tornado只要把異步的代碼寫好就能夠很好的發揮出tornado的優點了。sql
傳統的I/O(socket)堵塞編程模式流程:數據庫
while True:編程
1. socket accept (等待)windows
2. socket receive (接受數據)
3. handle data (處理數據)
4. socket send (返回結果)
while True:
1. events = epoll poll (主動拉取列表)
2. for file_descriptor, event in events: (查找是否有新的請求)
3. async handle data
3.1 標註狀態(running)
3.2 異步丟給其餘函數經過線程的方式執行.
3.3 線程執行完畢後修改狀態爲(finish), 而且經過回掉的方式註冊進 ioloop中(ioloop.add_done_callback或者ioloop.add_future)
4. socket send (返回結果)
1. python >= 2.7 < 3.x
2. pip install requests tornado futures
centos 7 | blockingServer.py | 192.168.1.100 | 構建用於測試的堵塞環節 |
windows 8 | blockingClient.py nbAsync.py nbFuture.py nbCoroutine.py nbGenTask.py |
192.168.1.101 | 驗證經常使用異步非堵塞寫法 |
centos 7 | siege、ab | 192.168.1.102 | 併發環境 |
在192.168.1.100服務器上運行用於測試的堵塞服務器(其實是非堵塞模式,只不過是每一個鏈接都要等待5秒鐘).
# 目的是提供一個堵塞的環境用來證實tornado結合經常使用的異步寫法都是非堵塞高效模式.
python blockingServer.py
# -.- coding:utf-8 -.- import tornado.web import tornado.gen import tornado.ioloop import tornado.options import tornado.httpserver class BlockingHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self, *args, **kwargs): # 若是這條命令沒看懂的話,請參考這個連接: http://www.tornadoweb.org/en/stable/faq.html yield tornado.gen.sleep(5) self.write('ok') class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockingHandler), ] super(Application, self).__init__(handlers) if __name__ == "__main__": tornado.options.define("port", default=88, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
# 文件名: blockingClient.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import requests class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): response = requests.get('http://192.168.1.100:88/blocking') # blocked here. result = dict(response.headers) result.update({'content': response.content}) self.write(result) class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
1. 在192.168.1.102壓力測試服務器上運行以下併發測試命令.
# 發起10個併發,持續60秒鐘. [root@localhost ~]# siege http://192.168.1.101/blocking -c10 -t60s
2. 在 windows 8 (192.168.1.101)上用瀏覽器來訪問以下連接.
http://192.168.1.101/non_blocking
siege:
** SIEGE 4.0.2 ** Preparing 10 concurrent users for battle. The server is now under siege... HTTP/1.1 200 5.07 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 10.15 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 15.23 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 20.31 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 25.38 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 30.47 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 35.55 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 40.63 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.71 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.53 secs: 212 bytes ==> GET /blocking HTTP/1.1 200 45.51 secs: 212 bytes ==> GET /blocking Lifting the server siege... Transactions: 11 hits Availability: 100.00 % Elapsed time: 59.65 secs Data transferred: 0.00 MB Response time: 29.05 secs Transaction rate: 0.18 trans/sec Throughput: 0.00 MB/sec Concurrency: 5.36 Successful transactions: 11 Failed transactions: 0 Longest transaction: 45.71 Shortest transaction: 5.07
瀏覽器:
non_block也是等待狀態,必需要等block執行完成後,纔會執行non_block.
siege在60秒鐘內,只獲得了11個結果,證實堵塞很是嚴重, 而且瀏覽器也是出於一直等待的狀態.
也就是說在tornado中若是寫堵塞代碼,只有單線程在運行的tornado,會死的很難看,剛接觸tornado的同窗甚至都不知道爲何會這樣,根本沒有像據說那樣tornado是一個極其高效的web框架。
經過結果能夠看出,不採用異步的方式就沒法發揮出它的能力。
# 文件名: nbAysnc.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver # import requests # 不用requests, 後面再討論用requests也能異步非堵塞. import tornado.httpclient # 採用tornado自帶的異步httpclient客戶端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
# 這裏提供一個不採用任何裝飾器的寫法, 比較raw ioloop, 運行結果是一致的,效率也是一致的.
# 文件名: nbAsync_NoAsyncDecorator.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent # import requests # 不僅用requests import tornado.httpclient # 採用tornado自帶的異步httpclient客戶端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): # def get上方移除了tornado.web.asynchonous裝飾器 self._auto_finish = False client = tornado.httpclient.AsyncHTTPClient() future = client.fetch('http://192.168.1.100:88/blocking') # 在這裏添加callback也行 tornado.ioloop.IOLoop.current().add_future(future, callback=self.on_response) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 經過define 能夠爲options增長變量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.
siege:
Lifting the server siege... Transactions: 100 hits Availability: 100.00 % Elapsed time: 59.13 secs Data transferred: 0.02 MB Response time: 5.61 secs Transaction rate: 1.69 trans/sec Throughput: 0.00 MB/sec Concurrency: 9.48 Successful transactions: 100 Failed transactions: 0 Longest transaction: 10.62 Shortest transaction: 5.07
瀏覽器:
訪問non_blocking頁面正常並且響應很快。
siege一直在持續併發請求的同時用瀏覽器來訪問non_blocking和blocking頁面都可以獲得響應,也就證實tornado已經開始發揮它的功效了。
採用了異步非堵塞模式後,被命中只有110次,落差很大,心理很是不平衡。其實這並非問題,這裏面有多重限制因此纔會致使這個結果。
1. AsyncHttpClient自己的限制(默認狀況下只容許同時發起10個客戶端). 詳情請參考tornado源碼的 simple_httpclient.py文件
2. ioloop自己的限制(爲了保證線程的穩定性,默認只開啓了10個線程來支持併發). 詳情請參考tornado源碼的 netutil.py文件
能夠經過設定參數來提升併發能力(將 tornado.httpclient.AsyncHTTPClient() 改成 tornado.httpclient.AsyncHTTPClient(max_clients=100)).
max_clients由默認的10改成100後,測試結果的hits也隨之增長了十倍.
Lifting the server siege... Transactions: 1099 hits Availability: 100.00 % Elapsed time: 59.71 secs Data transferred: 0.22 MB Response time: 5.10 secs Transaction rate: 18.41 trans/sec Throughput: 0.00 MB/sec Concurrency: 93.81 Successful transactions: 1099 Failed transactions: 0 Longest transaction: 6.34 Shortest transaction: 5.06
# 文件名: nbFuture.py
# 備註: 在第二章節中的移除tornado.web.asynchonous裝飾器的寫法一樣適合futures. 詳情請參考源碼文件: nbFuture_NoAsyncDecorator.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent # import requests # 不僅用requests import tornado.httpclient # 採用tornado自帶的異步httpclient客戶端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() future = tornado.concurrent.Future() fetch_future = client.fetch('http://192.168.1.100:88/blocking', callback=self.on_response) fetch_future.add_done_callback(lambda x: future.set_result(x.result())) def on_response(self, content): result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 經過define 能夠爲options增長變量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.
於<2. tornado.web.asynchronous >的測試結果基本一致.
future是官方特別推薦用來練習的一種編碼方式,由於這樣會比較深刻的瞭解tornado的運做原理。
future的add_done_callback方法,是告訴ioloop當future的狀態變動爲完成的時候,就調用包裹在add_done_callback中的函數(或匿名函數).
future還提供了一組produce方法和consumer方法, 用於管理future的狀態.
# 文件名: nbGenTask.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent import tornado.gen # 導入tornado.gen模塊 # import requests # 不僅用requests import tornado.httpclient # 採用tornado自帶的異步httpclient客戶端 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) class BlockHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self, *args, **kwargs): client = tornado.httpclient.AsyncHTTPClient() content = yield tornado.gen.Task(client.fetch, ('http://192.168.1.100:88/blocking')) result = dict(content.headers) result.update({'content': content.body}) self.write(result) self.finish() class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.
於<2. tornado.web.asynchronous >的測試結果基本一致.
tornado.gen.Task須要配合tornado.gen.coroutine裝飾器來完成代碼的運行,由於Task利用了yield,它的隱藏方法run()利用了gen.send()方法,因此gen模塊必需要用coroutine裝飾器.
利用coroutine的方式比較明顯的一個地方是,代碼不用再分開了, 這個是Python語言的一個特性,yield關鍵字能夠賦值給一個變量, 所以就不須要callback了.
這樣有什麼好處? 本地變量和全局變量不用傳遞了,默認就是共享的,這個算不算很爽?
# 文件名: nbFuture.py
# -.- coding:utf-8 -.- # __author__ = 'zhengtong' import tornado.ioloop import tornado.web import tornado.options import tornado.httpserver import tornado.concurrent import tornado.gen import requests import tornado.concurrent # 導入 tornado.concurrent 併發模塊 class Application(tornado.web.Application): def __init__(self): handlers = [ ('/blocking', BlockHandler), ('/non_blocking', NonBlockHandler), ] super(Application, self).__init__(handlers) # 建議設定爲CPU核心數量 * 4或8或16也是能夠接受的, 取決於計算量,計算量越大設定的值應該越小. self.executor = tornado.concurrent.futures.ThreadPoolExecutor(16) class BlockHandler(tornado.web.RequestHandler): @property def executor(self): return self.application.executor @tornado.gen.coroutine def get(self, *args, **kwargs): print dir(self) content = yield self.executor.submit(requests.get, ('http://192.168.1.100:88/blocking')) result = dict(content.headers) result.update({'content': content.content}) self.write(result) class NonBlockHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.write('non_blocking') if __name__ == "__main__": # 經過define 能夠爲options增長變量. tornado.options.define("port", default=80, help="run on the given port", type=int) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(Application()) http_server.listen(tornado.options.options.port) tornado.ioloop.IOLoop.current().start()
參考<1. tornado + 非異步代碼(堵塞的代碼) >章節的測試方法.
於<2. tornado.web.asynchronous >的測試結果基本一致.
自從了coroutine、threadpool、processpool以後,tornado算是一個里程碑式的解放了對異步的要求, 緣由是tornado的異步庫只針對httpclient, 沒有針對mysql或者其餘數據庫的異步庫(本身寫一個異步庫難度過高,由於展轉十幾個源碼文件的重度調用以及每一個類中的狀態控制)。
coroutine結合threadpool讓編寫異步代碼再也不拆成多個函數,變量可以共享,堵塞的代碼(例如 requests、mysql.connect、密集計算)能夠不影響ioloop,造成真正的閉合.
https://github.com/tornadoweb/tornado/wiki
http://scotdoyle.com/python-epoll-howto.html
tornado源碼