1、Tornado異步非阻塞
通常的web框架,能夠分爲兩類:
阻塞式:(Django,Flask,Tornado,Bottle)
一個請求到來未處理完成,後續一直等待
解決方案:多線程或多進程
異步非阻塞(存在IO請求):Tornado (單進程+單線程)
- 使用
- @gen.coroutine
- yield Future對象
1.簡單的異步例子
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time class IndexHandler(RequestHandler): @gen.coroutine def get(self): print('開始') future = Future() #從當前時間開始夯住10S,當時間結束執行 self.doing方法 tornado.ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing) yield future def doing(self, *args, **kwargs): self.write('async') #關閉鏈接 self.finish() application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
在內部中,Select循環檢測Future,當有返回值時,Future內部會做出變化,此時便可釋放向本身發送請求的連接html
2.做爲中轉,接收用戶對本身的請求,而本身須要向另外一個API發送請求處理,在發送請求的過程當中,佔用大量時間,這時候就可使用異步非阻塞的形式。python
好比一個購物網站,用戶向本身發來購買訂單,本身再向預約的後臺處理API發送處理請求,在這段時間內能夠接收其它用戶發來的訂單請求,這將大大提高處理的效率。mysql
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from tornado import httpclient class IndexHandler(RequestHandler): @gen.coroutine def get(self): print('收到訂單') #開啓一個非阻塞HTTP客戶端對象 http = httpclient.AsyncHTTPClient() # 執行請求,異步返回HTTPResponse。在向另外一url請求的過程當中,能夠繼續接收其它對本身的請求 yield http.fetch("http://www.github.com", self.done,) def done(self, response): #異步方法接收HTTPResponse,並能夠繼續執行其它代碼 print(response) self.write('訂單成功') #關閉此鏈接 self.finish() application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
fetch
(request,callback = None,raise_error = True,** kwargs )[source]git
執行請求,異步返回HTTPResponse
。github
請求能夠是字符串URL或HTTPRequest
對象。若是它是一個字符串,咱們構造一個HTTPRequest
使用任何額外的kwargs:HTTPRequest(request, **kwargs)
web
該方法return一個Future
結果是一個 HTTPResponse
。默認狀況下,Future
將引起一個HTTPError
,若是該請求返回一個非200響應代碼(其餘錯誤也能夠獲得提高,若是服務器沒法聯繫)。相反,若是raise_error
設置爲False,則不管響應代碼如何,都將始終返回響應。sql
若是callback
給出一個,它將被調用HTTPResponse
。在回調界面中,HTTPError
不會自動提出。相反,您必須檢查響應的error
屬性或調用其rethrow
方法。數據庫
3.模擬異步非阻塞執行的流程:服務器
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from tornado import httpclient fu = None class IndexHandler(RequestHandler): @gen.coroutine def get(self): #將fu設置爲全局變量 global fu print('瘋狂的追求') fu = Future() #添加一個回調,若是完成運行而且其結果可用時,它將做爲其參數被調用 fu.add_done_callback(self.done) yield fu def done(self, response): self.write('終於等到你') self.finish() class TestHandler(RequestHandler): def get(self):
#手動設置Future結果 fu.set_result(666) self.write('我只能幫你到這裏了') application = tornado.web.Application([ (r"/index", IndexHandler), (r"/test", TestHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
當return一個Future時,能夠添加一個回調函數,而這個回調函數,只有在請求成功接收到結果時纔會執行,不然將一直夯住,而Future內部則是經過set_result()方法使其獲得一個成功返回的信號從而執行回調。正常狀況下set_result()的值是正確的返回結果,這裏經過僞造結果達到異步的效果。done()方法中的response即set_result()的值。cookie
3.1 多線程自動執行:
import tornado.ioloop import tornado.web from tornado.web import RequestHandler from tornado import gen from tornado.concurrent import Future import time from threading import Thread def waiting(future): import time time.sleep(10) #10S後執行set_result使回調生效。 future.set_result(666) class IndexHandler(RequestHandler): @gen.coroutine def get(self): global fu print('瘋狂的追求') fu = Future() fu.add_done_callback(self.done) #開啓線程執行waiting函數, thread = Thread(target=waiting,args=(fu,)) thread.start() yield fu def done(self, response): self.write('終於等到你') self.finish() application = tornado.web.Application([ (r"/index", IndexHandler), ]) if __name__ == "__main__": application.listen(8888) tornado.ioloop.IOLoop.instance().start()
4.使用Tornado異步非阻塞模擬用戶登陸
import tornado.web from tornado import gen import tornado_mysql #必需要使用支持的類庫,如tornado_mysql ,sqlalchemy以及pymysql都不支持 @gen.coroutine def get_user(user): #鏈接時會有IO耗時 conn = yield tornado_mysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='cmdb', charset='utf8') cur = conn.cursor() #查詢時也會有IO耗時 # yield cur.execute("SELECT name,email FROM web_models_userprofile where name=%s", (user,)) yield cur.execute("select sleep(10)") row = cur.fetchone() cur.close() conn.close() #gen.Task特有的return方式,經過捕捉異常獲取數據庫查詢的值 raise gen.Return(row) class LoginHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): self.render('login.html') @gen.coroutine def post(self, *args, **kwargs): user = self.get_argument('user') #yield一個Task任務,封裝數據庫查詢的函數,當查詢遇到IO操做時不會阻塞,能夠接收其它請求。 data = yield gen.Task(get_user, user) if data: print(data) self.redirect('http://www.baidu.com') else: self.render('login.html')
2、自定義簡易非阻塞框架
1.socket服務端基本(阻塞式):
import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("127.0.0.1", 9999,)) sock.listen(5) while True: client, address = sock.accept() data = client.recv(8096) print(data) # /r/n/r/n # 請求頭中URL # 請求體中數據 # URL去路由關係中匹配 (請求體中數據) # response = func() import time time.sleep(10) client.sendall(b'uuuuuuuuuuuuuu') client.close()
2.應用select模擬非阻塞web框架的基本流程:
import socket import select def f1(request): return "內容1" def f2(request): return "內容2" #路由匹配系統 urls = [ ('/index.html',f1), ('/home.html',f2), ] sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) #設置非阻塞 sock.setblocking(False) sock.bind(("127.0.0.1", 9999,)) sock.listen(5) input_list = [sock,] while True: """ 客戶端第一次創建鏈接以及後續的請求發來數據都會對rlist有變更,對rlist進行循環, 若是等於sock便是創建鏈接的請求,不然就是客戶端後續發來數據,所以進行不一樣的操做 """ rlist,wlist,elist = select.select(input_list,[],[],0.005) for sk in rlist: # 有新人來鏈接:sock 進行accept,接收鏈接,等待客戶來鏈接 if sk == sock: client, address = sock.accept() client.setblocking(False) input_list.append(client) #將客戶端鏈接添加到循環列表中 # 老人發來數據: client 創建鏈接並接收數據 else: data = sk.recv(8096) # /r/n/r/n (分割請求頭跟請求體) # 請求頭中URL # 請求體中數據 (提取URL跟數據) # URL去路由關係中匹配 (請求體中數據) # response = func() #根據數據匹配URL,服務器作相應的視圖處理,處理後的數據返回給客戶端 response = f1(data) sk.sendall(response.encode('utf-8')) sk.close()
3.簡單的web框架基本實現原理
import re import socket import select import time class Future(object): """ 異步非阻塞模式時封裝回調函數以及是否準備就緒 """ def __init__(self, callback): self.callback = callback self._ready = False self.value = None def set_result(self, value=None): self.value = value self._ready = True @property def ready(self): return self._ready class HttpResponse(object): """ 封裝響應信息 """ def __init__(self, content=''): self.content = content self.headers = {} self.cookies = {} def response(self): return bytes(self.content, encoding='utf-8') class HttpNotFound(HttpResponse): """ 404時的錯誤提示 """ def __init__(self): super(HttpNotFound, self).__init__('404 Not Found') class HttpRequest(object): """ 用戶封裝用戶請求信息 """ def __init__(self, conn): """ :param conn: 客戶端的鏈接 """ self.conn = conn self.header_bytes = bytes() self.body_bytes = bytes() self.header_dict = {} self.method = "" self.url = "" self.protocol = "" self.initialize() self.initialize_headers() def initialize(self): header_flag = False while True: try: received = self.conn.recv(8096) except Exception as e: received = None if not received: break if header_flag: self.body_bytes += received continue temp = received.split(b'\r\n\r\n', 1) if len(temp) == 1: self.header_bytes += temp else: h, b = temp self.header_bytes += h self.body_bytes += b header_flag = True @property def header_str(self): return str(self.header_bytes, encoding='utf-8') def initialize_headers(self): headers = self.header_str.split('\r\n') first_line = headers[0].split(' ') if len(first_line) == 3: self.method, self.url, self.protocol = headers[0].split(' ') for line in headers: kv = line.split(':') if len(kv) == 2: k, v = kv self.header_dict[k] = v class Snow(object): """ 微型Web框架類, """ def __init__(self, routes): """ :param routes: url路由匹配 """ self.routes = routes self.inputs = set() self.request = None self.async_request_handler = {} def run(self, host='localhost', port=9999): """ 事件循環 :param host: IP地址 :param port: 端口 :return: """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port,)) sock.setblocking(False) sock.listen(128) sock.setblocking(0) self.inputs.add(sock) try: while True: readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005) for conn in readable_list: if sock == conn: # 鏈接 client, address = conn.accept() client.setblocking(False) self.inputs.add(client) else: # 發送請求 # HttpResponse # 生成器 """ gen=HttpResponse對象,執行了三件事, 1.HttpRequest類,將客戶端拿到,分割請求頭與請求體,將請求頭信息處理成header_dict {'method':xxx,'url':xxx} 2.循環路由,實例化HttpRequest類, .url與其進行匹配,匹配成功將self.fun 賦值稱爲路由系統填寫的func函數。 3.處理func視圖,返回2種類型,HttpResponse對象或者Future對象。若是是HttpResponse()對象,封裝了將字符串轉換 成字節的方法,即仍是一種阻塞式,即通常的如Django類web框架的處理流程, """ gen = self.process(conn) if isinstance(gen,HttpResponse): conn.sendall(gen.response()) self.inputs.remove(conn) conn.close() else: yielded = next(gen) self.async_request_handler[conn] = yielded for conn,fur in self.async_request_handler.items(): if not fur.ready: continue response = fur.callback(fur.value) conn.sendall(response) conn.close() del self.async_request_handler[conn] except Exception as e: pass finally: sock.close() def process(self, conn): """ 處理路由系統以及執行函數 :param conn: :return: """ self.request = HttpRequest(conn) func = None for route in self.routes: if re.match(route[0], self.request.url): func = route[1] break if not func: return HttpNotFound() else: return func(self.request) def index(request): return HttpResponse('OK') def asynccccc(request): fur = Future(callback=done) yield fur def done(arg): pass routes = [ (r'/index/', index), (r'/async/', asynccccc), ] if __name__ == '__main__': app = Snow(routes) app.run(port=8012)
gen對象中,若是返回的是一個Future對象,則內部會將封裝的Future對象yield給回調函數,或者說是交給中轉站,由中轉站向基礎平臺的API發送IO請求,這時不會阻塞,會在循環中繼續yield新的請求,並生成一個字典,{conn:yield Future的對象},而在外層的循環中監聽這個對象,一旦對象發送的IO請求處理完成,將執行set_result方法,將ready設置成True,並拿到請求接收的值即賦值給value,再把value傳遞給回調函數中進行相應處理, 得到一個最後的值,將conn從字典中移除,將處理後的值返回給用戶。
4.最後整理後的簡單異步非阻塞web框架
import re import socket import select import time class HttpResponse(object): """ 封裝響應信息 """ def __init__(self, content=''): self.content = content self.headers = {} self.cookies = {} def response(self): return bytes(self.content, encoding='utf-8') class HttpNotFound(HttpResponse): """ 404時的錯誤提示 """ def __init__(self): super(HttpNotFound, self).__init__('404 Not Found') class HttpRequest(object): """ 用戶封裝用戶請求信息 """ def __init__(self, conn): self.conn = conn self.header_bytes = bytes() self.header_dict = {} self.body_bytes = bytes() self.method = "" self.url = "" self.protocol = "" self.initialize() self.initialize_headers() def initialize(self): header_flag = False while True: try: received = self.conn.recv(8096) except Exception as e: received = None if not received: break if header_flag: self.body_bytes += received continue temp = received.split(b'\r\n\r\n', 1) if len(temp) == 1: self.header_bytes += temp else: h, b = temp self.header_bytes += h self.body_bytes += b header_flag = True @property def header_str(self): return str(self.header_bytes, encoding='utf-8') def initialize_headers(self): headers = self.header_str.split('\r\n') first_line = headers[0].split(' ') if len(first_line) == 3: self.method, self.url, self.protocol = headers[0].split(' ') for line in headers: kv = line.split(':') if len(kv) == 2: k, v = kv self.header_dict[k] = v class Future(object): """ 異步非阻塞模式時封裝回調函數以及是否準備就緒 """ def __init__(self, callback): self.callback = callback self._ready = False self.value = None def set_result(self, value=None): self.value = value self._ready = True @property def ready(self): return self._ready class TimeoutFuture(Future): """ 異步非阻塞超時 """ def __init__(self, timeout): super(TimeoutFuture, self).__init__(callback=None) self.timeout = timeout self.start_time = time.time() @property def ready(self): current_time = time.time() if current_time > self.start_time + self.timeout: self._ready = True return self._ready class Snow(object): """ 微型Web框架類 """ def __init__(self, routes): self.routes = routes self.inputs = set() self.request = None self.async_request_handler = {} def run(self, host='localhost', port=9999): """ 事件循環 :param host: :param port: :return: """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port,)) sock.setblocking(False) sock.listen(128) sock.setblocking(0) self.inputs.add(sock) try: while True: readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005) for conn in readable_list: if sock == conn: client, address = conn.accept() client.setblocking(False) self.inputs.add(client) else: gen = self.process(conn) if isinstance(gen, HttpResponse): conn.sendall(gen.response()) self.inputs.remove(conn) conn.close() else: yielded = next(gen) self.async_request_handler[conn] = yielded self.polling_callback() except Exception as e: pass finally: sock.close() def polling_callback(self): """ 遍歷觸發異步非阻塞的回調函數 :return: """ for conn in list(self.async_request_handler.keys()): yielded = self.async_request_handler[conn] if not yielded.ready: continue if yielded.callback: ret = yielded.callback(self.request, yielded) conn.sendall(ret.response()) self.inputs.remove(conn) del self.async_request_handler[conn] conn.close() def process(self, conn): """ 處理路由系統以及執行函數 :param conn: :return: """ self.request = HttpRequest(conn) func = None for route in self.routes: if re.match(route[0], self.request.url): func = route[1] break if not func: return HttpNotFound() else: return func(self.request)
使用:
基本使用(沒有使用異步非阻塞):
from s4 import Snow from s4 import HttpResponse def index(request): return HttpResponse('OK') routes = [ (r'/index/', index), ] app = Snow(routes) app.run(port=8012)
手動的異步非阻塞使用
from s4 import Snow from s4 import HttpResponse from s4 import Future request_list = [] def callback(request, future): return HttpResponse(future.value) def req(request): print('請求到來') obj = Future(callback=callback) request_list.append(obj) yield obj def stop(request): obj = request_list[0] obj.set_result('done') del request_list[0] return HttpResponse('stop') routes = [ (r'/req/', req), (r'/stop/', stop), ] app = Snow(routes) app.run(port=8012)
因爲咱們在最外層循環了一個{conn:future}的字典,因此須要手動去對stop視圖函數發送請求,由它進行set_result,若是是自動的方法,就是將這個conn添加到select中,由select進行監聽並set_result,
達到程序自動返回的效果。