線上有一個相關百科的服務,返回一個query中說起的百科詞條。該服務是用python實現的,之前經過thrift接口訪問,現要將其改成經過HTTP訪問。以前沒有搭建HTTPServer的經驗,所以想用python的web Framework來作這件事,因而有了下面的工做。第一部分是框架選擇,這一部分沒有太仔細考慮,只是大概看了一些文章。第二部分是根據所須要的功能,學習及測試在框架上應該如何實現。第三部分是實際的代碼。第四部分是下一步的學習。html
python有不少開源的web framework。從知乎上找了幾篇綜述型的簡介,大致包括:Django、Bottle、Flask、web2py、Tornado。看中了介紹中說起Tornado的速度與併發量,因而打算用tornado來實現。因此按目前的瞭解,或許Tornado並不是實現本工做的最佳方案,只是一個可行方案。python
tornado具備web framework的功能,所以用它開發web服務很是方便:web
- 實現處理請求的Handler,該類繼承自
tornado.web.RequestHandler
,實現用於處理請求的對應方法如:get、post等。返回內容用self.write
方法輸出。- 實例化一個Application。構造函數的參數是一個Handlers列表,經過正則表達式,將請求與Handler對應起來。經過dict將Handler須要的其餘對象以參數的方式傳遞給Handler的initialize方法。
- 初始化一個
tornado.httpserver.HTTPServer
對象,構造函數的參數是上一步的Application對象。- 爲HTTPServer對象綁定一個端口。
- 開始IOLoop。
原服務是一個內存佔用大,IO密集,計算量適中的服務。正則表達式
- 內存佔用大。須要加載一個比較大的詞表,其中每一個詞對應一個id列表,這一部分是C++實現的,經過boost.python封裝爲python可調用的so。原服務單進程佔用內存超過5G。
- IO密集。計算過程當中大量訪問redis讀取term及baikeid的屬性信息,用於過濾及rank計算。也訪問在線分詞服務,獲取各term的NLP分析。
- 計算量適中。劃詞匹配、rank計算有必定計算量,可是整體來看計算量不是特別大。python單進程天天500多萬的訪問量,單CPU利用率也就40%-50%之間。
關於服務的分析:redis
- 內存佔用大。內存佔用大,但絕大部分是隻讀的。不適合獨立啓動多個進程,適合多線程或用子進程。
- IO密集。適合將IO操做都變爲異步請求,或者用多線程模型。
- 計算量適中。因爲python解釋器使用GIL,多線程只能提升IO的併發能力,不能提升計算的併發能力。所以能夠考慮經過子進程的方式,適當增長提供服務的進程數,提升整個系統服務能力的上限。
因爲tornado的亮點是異步請求,因此這裏首先想到的是將全部請求都改造爲異步的。可是這裏遇到一個問題,就是異步函數內必定不能有阻塞調用出現,不然整個IOLoop都會被卡住。這就要求完全地去改造服務,將全部IO或是用時較長的請求都改造爲異步函數。這個工程量是很是大的,須要去修改已有的代碼。所以,咱們考慮用線程池的方式去實現。當一個線程阻塞在某個請求或IO時,其餘線程或IOLoop會繼續執行。json
另一個瓶頸就是GIL限制了CPU的併發數量,所以考慮用子進程的方式增長進程數,提升服務能力上限。瀏覽器
綜合上面的分析,大體用如下方案:多線程
- 經過子進程的方式複製多個進程,使子進程中的只讀頁指向同一個物理頁。
- 線程池。迴避異步改造的工做量,增長IO的併發量。
首先測試線程池,測試用例爲:併發
對sleep頁面同時發出兩個請求:app
- 在線程池中運行的函數(這裏是
self.block_task
)可以同時執行。表現爲在控制檯交替打印出數字。- 兩個get請求幾乎同時返回,在瀏覽器上顯示返回的內容。
線程池的測試代碼以下:
import os import sys import time import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor from tornado.options import define, options class HasBlockTaskHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(20) #起線程池,由當前RequestHandler持有 @tornado.gen.coroutine def get(self): strTime = time.strftime("%Y-%m-%d %H:%M:%S") print "in get before block_task %s" % strTime result = yield self.block_task(strTime) print "in get after block_task" self.write("%s" % (result)) @run_on_executor def block_task(self, strTime): print "in block_task %s" % strTime for i in range(1, 16): time.sleep(1) print "step %d : %s" % (i, strTime) return "Finish %s" % strTime if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(8888) tornado.ioloop.IOLoop.instance().start()
整個代碼裏有幾個位置值得關注:
executor = ThreadPoolExecutor(20)
。這是給Handler類初始化了一個線程池。其中concurrent.futures
不屬於tornado,是python的一個獨立模塊,在python3中是內置模塊,python2.7須要本身安裝。- 修飾符
@run_on_executor
。這個修飾符將同步函數改造爲在executor(這裏是線程池)上運行的異步函數,內部實現是將被修飾的函數submit到executor,返回一個Future對象。- 修飾符
@tornado.gen.coroutine
。被這個修飾符修飾的函數,是一個以同步函數方式編寫的異步函數。本來經過callback方式編寫的異步代碼,有了這個修飾符,能夠經過yield一個Future的方式來寫。被修飾的函數在yield了一個Future對象後將會被掛起,Future對象的結果返回後繼續執行。
運行代碼後,在兩個不一樣瀏覽器上訪問sleep頁面,獲得了想要的效果。這裏有一個小插曲,就是若是在同一瀏覽器的兩個tab上進行測試,是沒法看到想要的效果。第二個get請求會被block,直到第一個get請求返回,服務端纔開始處理第二個get請求。這讓我一度以爲多線程沒有生效,用了半天時間查了不少資料,纔看到是瀏覽器把相同的第二個請求block了,具體連接參考這裏。
因爲tornado很方便地支持多進程模型,多進程的使用要簡單不少,在以上例子中,只須要對啓動部分稍做改動便可。具體代碼以下所示:
if __name__ == "__main__": tornado.options.parse_command_line() app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(8888) print tornado.ioloop.IOLoop.initialized() http_server.start(5) tornado.ioloop.IOLoop.instance().start()
須要注意的地方有兩點:
app = tornado.web.Application(handlers=[(r"/sleep", HasBlockTaskHandler)], autoreload=False, debug=False)
,在生成Application對象時,要將autoreload和debug兩個參數至爲False。也就是須要保證在fork子進程以前IOLoop是未被初始化的。這個能夠經過tornado.ioloop.IOLoop.initialized()
函數來跟。http_server.start(5)
在啓動IOLoop以前經過start函數設置進程數量,若是設置爲0表示每一個CPU都啓動一個進程。
最後的效果是能夠看到n+1個進程在運行,且公用同一個端口。
大部分邏輯代碼是封裝好的,服務的代碼以下:
import os import sys import json import tornado.httpserver import tornado.ioloop import tornado.options import tornado.httpclient import tornado.web import tornado.gen from tornado.concurrent import run_on_executor from concurrent.futures import ThreadPoolExecutor from tornado.options import define, options import rela_baike_server from rela_baike_server import RelaBaikeRequest, RelaBaikeResult, RelaBaikeServer import logging from logging.handlers import TimedRotatingFileHandler logging.basicConfig() import pdb g_log_prefix = '../log/rela_baike_tornado.' def getLogger(strPrefixBase): strPrefix = "%s%d" % (strPrefixBase, os.getpid()) logger = logging.getLogger("RELA_BAIKE") logger.propagate = False handler = TimedRotatingFileHandler(strPrefix, 'H', 1) handler.suffix = "%Y%m%d_%H%M%S.log" formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) return logger def makeResponseBody(retCode, errReason, dicSummary): dicRes = {} dicRes['retCode'] = retCode if retCode != 0: dicRes['error'] = errReason else: dicRes['data'] = dicSummary return json.dumps(dicRes) class RelaBaikeHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(50) def initialize(self, relaServer, logger): self.__serverRelaBaike = relaServer self.__logger = logger @tornado.gen.coroutine def get(self): lstSummary = [] retCode = 0 errReason = "" try: utfQuery = self.get_argument('query').encode('utf8').strip() except: errorReason = 'Query encoding not utf-8.' strRes = makeResponseBody(-1, errorReason, lstSummary) self.write(strRes) return if utfQuery == "": strRes = makeResponseBody(0, '', lstSummary) self.write(strRes) return error, errReason, lstSummary = yield self.getRelaBaike(utfQuery) strRes = makeResponseBody(error, errReason, lstSummary) self.write(strRes) def __logResponse(self, utfQuery, relaResult): succ = relaResult.isSuccess() if succ: self.__logger.info("%s\tSucc\t%s" % (utfQuery, "|".join([str(item[0]) for item in relaResult]))) else: self.__logger.info("%s\tError:%d" % (utfQuery, relaResult.getError())) @run_on_executor def getRelaBaike(self, utfQuery): error = 0 lstSummary = [] relaBaikeRequest = RelaBaikeRequest(content=utfQuery) relaBaikeResult = self.__serverRelaBaike.getRelaBaike(relaBaikeRequest) self.__logResponse(utfQuery, relaBaikeResult) if relaBaikeResult.isSuccess(): for item in relaBaikeResult: baikeid = item[0] try: dicSummary = json.loads(item[1]) except: return -2, 'summary format error' ,lstSummary lstSummary.append(dicSummary) else: return relaBaikeResult.getError(), rela_baike_server.g_dic_error.get(relaBaikeResult.getError(), 'other error') ,lstSumm ary return 0, 'success',lstSummary def start(): port = int(sys.argv[1]) serverRelaBaike = rela_baike_server.getRelaBaikeServer() logger = getLogger(g_log_prefix) app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))]) http_server = tornado.httpserver.HTTPServer(app) http_server.bind(port) http_server.start(2) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": start()
代碼所涉及的特性基本上不超過前面的測試例子,除了下兩幾點:
- 在*Handler類裏增長了一個
def initialize(self, relaServer, logger)
函數。這是爲了把一些初始化好的對象傳到Handler類裏。app = tornado.web.Application(handlers=[(r"/rela_baike", RelaBaikeHandler, dict(relaServer=serverRelaBaike, logger=logger))])
。前面handler的initialize函數參數,對應於Application初始化時,每一個handler對應的dict。