# -*- coding: utf-8 -*- import asyncio import time from functools import wraps __author__ = 'Frank Li' def time_count(func): @wraps(func) def inner_func(*args,**kw): start = time.time() result = func(*args,**kw) end = time.time() print('{} cost {:.1f} s...'.format(func.__name__,end-start)) return result return inner_func async def do_some_work(n): await asyncio.sleep(n) @asyncio.coroutine def task_io_01(): print('{} start to run ...'.format(task_io_01.__name__)) n = 3 yield from do_some_work(n) print('{} continue to work {} seconds later...'.format(task_io_01.__name__,n)) return task_io_01.__name__ @asyncio.coroutine def task_io_02(): print('{} start to run ...'.format(task_io_02.__name__)) n = 5 yield from do_some_work(n) print('{} continue to do the work in {} seconds'.format(task_io_02.__name__,n)) return task_io_02.__name__ @time_count def main(): tasks = [task_io_02(),task_io_01()] loop = asyncio.get_event_loop() done, pending = loop.run_until_complete(asyncio.wait(tasks)) for d in done: print('協程無序返回值:{}'.format(d.result())) loop.close() if __name__ == '__main__': main()
python 3.7 pip install aiohttp pip install jinja2 pip install aiomysql pycharm( python 開發 IDE) + git bash(練習命令) + github(遠程倉庫)
在 pycharm 裏 新建 project
並 cd 進入 project 目錄html
git init
初始化本地倉庫git remote add origin git@github.com:FrankLi99/awesome-python3-webapp.git
關聯本地與遠程倉庫執行 git add .
, git commit -m "init commit"
添加到暫存區,並提交到本地倉庫的 master 分支,由於就我一我的開發,其實 master + dev 兩個分支就已足夠python
執行 git push -u origin master
推送本地倉庫 master 分支 到遠程 master 分支, -u 參數 還能夠將 兩個 master 分支 關聯起來,下次拉取推送 就直接 git pull/push origin master
了mysql
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import logging; logging.basicConfig(level=logging.INFO) import asyncio, os, json, time from datetime import datetime from aiohttp import web def index(request): return web.Response(body=b'<h1>Awesome</h1>',content_type='text/html') @asyncio.coroutine def init(loop): app = web.Application(loop=loop) app.router.add_route('GET', '/', index) srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 9000) logging.info('server started at http://127.0.0.1:9000...') return srv loop = asyncio.get_event_loop() loop.run_until_complete(init(loop)) loop.run_forever()
git add www/app.py
git push origin master
# 第一次寫這麼複雜的代碼,我建議先把主要的 結構搭建出來,而後在熟讀源碼的基礎上,再來給每一個pass部分作替換(這部分真的很綜合,我認爲是進階) import asyncio import logging;logging.basicConfig(level=logging.info) import aiomysql def log(sql,args): logging.info('SQL: {sql} , other args: {args}'.format(sql=sql,args=args)) @asyncio.coroutine def create_pool(loop,**db_info): logging.info('start to create aiomysql database connection pool...') global __pool __pool = yield from aiomysql.create_pool(host=db_info.get('host','localhost'), port=db_info.get('port',3306), db=db_info.get('db'), user=db_info.get('user'), password=db_info.get('password'), charset=db_info.get('charset','utf-8'), autocommit=db_info.get('autocommit',True), minsize=db_info.get('minsize',1), maxsize=db_info.get('maxsize',10), loop=loop) @asyncio.coroutine def select(sql,args,size=None): log(sql,args) global __pool with (yield from __pool) as conn: csr = yield from conn.cursor(aiomysql.DictCursor) yield from csr.execute(sql.replace('?','%s'),args or ()) rs = csr.fetchmany(size) if size else csr.fetchall() return rs @asyncio.coroutine def execute(sql,args,autocommit=True): log(sql,args) global __pool with (yield from __pool) as conn: if not autocommit: yield from conn.begin() try: csr = yield from conn.cursor() yield from csr.execute(sql.replace('?','%s'),args or ()) affectedRow = csr.rowcount if not autocommit: yield from conn.commit() except BaseException as e: raise if not autocommit: yield from conn.rollback() finally: yield from csr.close() return affectedRow # 根據長度構造 佔位符 def create_args_string(num): return ','.join('?'*num) # 定義 Field 字段類 class Field(object): pass class IntegerField(Field): pass class StringField(Field): pass class FloatField(Field): pass class BooleanField(Field): pass class TextField(Field): pass # 定義實體類的元類,先有類來後有天,元類更在類以前 class ModelMetaclass(type): pass class Model(dict,metaclass=ModelMetaclass): pass
# -*- coding: utf-8 -*- __author__ = 'Frank Li' import asyncio import logging;logging.basicConfig(level=logging.DEBUG) import aiomysql def log(sql,args): logging.info('SQL:{sql} , other ARGS:{args}'.format(sql=sql,args=args)) # 建立數據庫鏈接池 -- 全局 @asyncio.coroutine def create_pool(loop,**db_info): logging.info('start to create global database connection pool...') global __pool __pool = yield from aiomysql.create_pool(host=db_info.get('host','localhost'), port=db_info.get('port',3306), db=db_info.get('db'), user=db_info.get('user'), password=db_info.get('password'), charset=db_info.get('charset','utf8'), autocommit=db_info.get('autocommit',True), minsize=db_info.get('minsize',1), maxsize=db_info.get('maxsize',10), loop=loop) @asyncio.coroutine def select(sql,args,size=None): log(sql,args) global __pool with (yield from __pool) as conn: csr = yield from conn.cursor(aiomysql.DictCursor) yield from csr.execute(sql.replace('?','%s'),args or ()) rs = csr.fetchmany(size) if size else csr.fetchall() yield from csr.close() # logging.info('result rowcount: {}'.format(len(rs))) return rs @asyncio.coroutine def execute(sql,args,autocommit=True): log(sql,args) global __pool with (yield from __pool) as conn: if not autocommit: yield from conn.begin() try: csr = yield from conn.cursor() yield from csr.execute(sql.replace('?','%s'),args or ()) affctedRow = csr.rowcount logging.info('execute sql affcted row: {}'.format(affctedRow)) if not autocommit: yield from conn.commit() except BaseException: if not autocommit: yield from conn.rollback() raise finally: try: yield from csr.close() except BaseException as e: logging.info(e) return affctedRow # 開始定義 字段類 Field class Field(object): def __init__(self,name,column_type,is_pk,default): self.name = name self.column_type = column_type self.is_pk = is_pk self.default = default def __repr__(self): return '<{},{}:{}>'.format(self.__class__.__name__,self.column_type,self.name) __str__ = __repr__ class IntegerField(Field): def __init__(self,name=None,column_type='bigint',is_pk=False,default=0): super(IntegerField,self).__init__(name,column_type,is_pk,default) class StringField(Field): def __init__(self,name=None,column_type='varchar(256)',is_pk=False,default=None): super(StringField,self).__init__(name,column_type,is_pk,default) class FloatField(Field): def __init__(self,name=None,column_type='real',is_pk=False,default=0.0): super(FloatField,self).__init__(name,column_type,is_pk,default) class BooleanField(Field): def __init__(self,name=None,column_type='boolean',default=False): super(BooleanField,self).__init__(name,column_type,False,default) class TextField(Field): def __init__(self,name=None,column_type='text',default=None): super(TextField,self).__init__(name,column_type,False,default) # 根據參數個數構造 ? 佔位符 def create_args_string(num): return ','.join('?'*num) # 開始寫 元類 ModelMetaclass 主要做用就是 對 全部 Model 構造 select ,insert,delete,update 語句 # 先有類來後有天,元類更在類以前 class ModelMetaclass(type): def __new__(cls,name,bases,attrs): # 排除掉 Model 這個 父類自己 if name=='Model': return type.__new__(cls,name,bases,attrs) tb_name = attrs.get('__table__',str.lower(name)) # 若是是其子類 mappings = {} fields = [] primaryKey = None for k,v in attrs.items(): if isinstance(v,Field): logging.info('found field mapping {} <==> {}'.format(k, v)) mappings[k] = v if v.is_pk: if primaryKey: raise RuntimeError('Duplicated primary key for field: {}'.format(k)) primaryKey = k else: fields.append(k) if not primaryKey: raise RuntimeError('Primary key not found...') # 去除 類中 與 實例中 變量同名的 類變量 for k in mappings.keys(): attrs.pop(k) escape_fields = list(map(lambda f: '`{}`'.format(f),fields)) # 將獲取到的值 放入 類屬性中 attrs['__mappings__'] = mappings attrs['__table__'] = tb_name attrs['__fields__'] = fields attrs['__primary_key__'] = primaryKey # 構造 select , update ,delete ,insert 語句 attrs['__select__'] = 'select `{primaryKey}`,{escape_fields} from `{tb_name}` '.format(primaryKey=primaryKey,escape_fields=','.join(escape_fields),tb_name=tb_name) attrs['__insert__'] = 'insert into `{tb_name}`({escape_fields},`{primaryKey}`) values({args})'.format(tb_name=tb_name,escape_fields=','.join(escape_fields),primaryKey=primaryKey,args=create_args_string(len(fields)+1)) attrs['__delete__'] = 'delete from `{tb_name}` where `{primaryKey}`=?'.format(tb_name=tb_name,primaryKey=primaryKey) attrs['__update__'] = 'update `tb_name` set {set_cols} where `{primaryKey}`=?'.format(tb_name=tb_name,set_cols=','.join(list(map(lambda f:'`{}`=?'.format(mappings.get(f).name or f),fields))),primaryKey=primaryKey) # 返回 改造好的 類模板 return type.__new__(cls,name,bases,attrs) # 造實體類的 模板 class Model(dict,metaclass=ModelMetaclass): def __init__(self,*args,**kw): super(Model,self).__init__(*args,**kw) # 方便 dict 對象 如同 屬性通常調用 dict.name , dict.id 等等 def __getattr__(self,key): try: return self[key] except KeyError: raise AttributeError(r"'Model' object has no attribute {}".format(key)) def __setattr__(self,key,value): self[key] = value def getValue(self,key): return getattr(self,key,None) def getValueOrDefault(self,key): value = self.getValue(key) if value is None: field = self.__mappings__[key] if field is not None: value = field.default() if callable(field.default) else field.default logging.info('using default for {} : {}'.format(key,str(value))) setattr(self,key,value) return value @classmethod @asyncio.coroutine def findAll(cls,where=None,args=None,**kw): 'find object by where clause' select_sql = [cls.__select__] if where: select_sql.append('where') select_sql.append(where) if args is None: args=[] orderBy = kw.get('orderbBy',None) if orderBy: select_sql.append('order by ') select_sql.append(orderBy) limit = kw.get('limit',None) if limit: select_sql.append('limit') if isinstance(limit,int): select_sql.append('?') args.append(limit) elif isinstance(limit,tuple): select_sql.append(create_args_string(2)) # ?,? args.extend(limit) else: raise ValueError('Invalid limit value: {}'.format(str(limit))) rs = yield from select(' '.join(select_sql),args) return [cls(**r) for r in rs] @classmethod @asyncio.coroutine def findNumber(cls,selectField,where=None,args=None): ''' find number by select and where''' select_sql = ['select {selectField} _num_ from `tb_name`'.format(selectField=selectField,tb_name=cls.__table__)] if where: select_sql.append('where') select_sql.append(where) rs = yield from select(' '.join(select_sql),args,1) if len(rs) ==0: return None return rs[0]['_num_'] @classmethod @asyncio.coroutine def find(cls,pk): '''find object by primary key''' rs = yield from select('{select_sql} where `{pk_field}`=?'.format(select_sql=cls.__select__,pk_field=cls.__primary_key__),[pk],size=1) return None if len(rs)==0 else cls(**rs[0]) # save @asyncio.coroutine def save(self): args = list(map(self.getValueOrDefault,self.__fields__)) args.append(self.getValueOrDefault(self.__primary_key__)) rows = yield from execute(self.__insert__,args,True) if rows != 1: logging.warn('failed to insert record: affcted rows: {rowCount}'.format(rowCount=rows)) else: logging.warn('affcted rows: {rowCount}'.format(rowCount=rows)) # update @asyncio.coroutine def update(self): args = list(map(self.getValueOrDefault,self.__fields__)) args.append(self.getValueOrDefault(self.__primary_key__)) rows = yield from execute(self.__update__,args) if rows !=1: logging.warn('failed to update by primary key: affected {rowCount}'.format(rowCount=rows)) # delete @asyncio.coroutine def remove(self): args = list(map(self.getValueOrDefault,self.__fields__)) args.append(self.getValueOrDefault(self.__primary_key__)) rows = yield from execute(self.__delete__,args) if rows != 1: logging.info('failed to delete by primary key affected rows: {rowCount}'.format(rowCount=rows))
# -*- coding: utf-8 -*- __author__ = 'Frank Li' from www.orm2 import Model,StringField,IntegerField,BooleanField,FloatField,create_pool import logging;logging.basicConfig(level=logging.DEBUG) import time from www.models import User import asyncio import uuid def next_id(): return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex) class User(Model): __table__ = 'users' id = StringField(is_pk=True, default=next_id, column_type='varchar(50)') email = StringField(column_type='varchar(50)') passwd = StringField(column_type='varchar(50)') admin = BooleanField() name = StringField(column_type='varchar(50)') image = StringField(column_type='varchar(500)') created_at = FloatField(default=time.time) @asyncio.coroutine def test(loop): yield from create_pool(loop=loop,user='www-data', password='www-data', db='awesome') u = User(name='Frank', email='Frank@example.com', passwd='1234567890', image='about:blank') yield from u.save() logging.info(u.__select__) logging.info(u.__insert__) logging.info(u.__delete__) logging.info(u.__update__) users = yield from User.findAll() for user in users: logging.info('user info:\n '+str(user)) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(test(loop)) loop.run_forever()
遍尋網絡終於發現註釋版的代碼
這篇博客不錯,終仍是本身見識太淺薄git
# 首先來個最簡單的吧, 構造 兩個 裝飾器用來對應 前臺頁面傳來的 兩種請求 ,get / post from functools import wraps,partial def handler_decorator(path,*,method='GET'): def decorator(func): @wraps(func) # 更正函數簽名 def wrapper(*args,**kw): result = func(*args,**kw) return result wrapper.__method__ = method wrapper.__urlpath__ = path return wrapper return decorator get=partial(handler_decorator,method='GET') post=partial(handler_decorator,method='POST') # 使用時候 就方便了 @get('/home/index') def home_index(request): pass
### 定義 RequestHandler 用來解析 request 中的 信息 import inspect,asyncio from web_app.APIError import APIError from aiohttp import web from urllib import parse #運用inspect模塊,建立幾個函數用以獲取URL處理函數與request參數之間的關係 def get_required_kw_args(fn): #收集沒有默認值的命名關鍵字參數 args = [] params = inspect.signature(fn).parameters #inspect模塊是用來分析模塊,函數 for name, param in params.items(): if str(param.kind) == 'KEYWORD_ONLY' and param.default == inspect.Parameter.empty: args.append(name) return tuple(args) def get_named_kw_args(fn): #獲取命名關鍵字參數 args = [] params = inspect.signature(fn).parameters for name,param in params.items(): if str(param.kind) == 'KEYWORD_ONLY': args.append(name) return tuple(args) def has_named_kw_arg(fn): #判斷有沒有命名關鍵字參數 params = inspect.signature(fn).parameters for name,param in params.items(): if str(param.kind) == 'KEYWORD_ONLY': return True def has_var_kw_arg(fn): #判斷有沒有關鍵字參數 params = inspect.signature(fn).parameters for name,param in params.items(): if str(param.kind) == 'VAR_KEYWORD': return True def has_request_arg(fn): #判斷是否含有名叫'request'參數,且該參數是否爲最後一個參數 params = inspect.signature(fn).parameters sig = inspect.signature(fn) found = False for name,param in params.items(): if name == 'request': found = True continue #跳出當前循環,進入下一個循環 if found and (str(param.kind) != 'VAR_POSITIONAL' and str(param.kind) != 'KEYWORD_ONLY' and str(param.kind != 'VAR_KEYWORD')): raise ValueError('request parameter must be the last named parameter in function: %s%s'%(fn.__name__,str(sig))) return found #定義RequestHandler,正式向request參數獲取URL處理函數所需的參數,發現 請求信息有問題的 raise APIError ,沒有問題 則放行 ,注意全部這些 fn 都是指的實際 handler.py 中的函數 class RequestHandler(object): def __init__(self,app,fn):#接受app參數 self._app = app self._fn = fn self._required_kw_args = get_required_kw_args(fn) self._named_kw_args = get_named_kw_args(fn) self._has_named_kw_arg = has_named_kw_arg(fn) self._has_var_kw_arg = has_var_kw_arg(fn) self._has_request_arg = has_request_arg(fn) async def __call__(self,request): #__call__這裏要構造協程 kw = None if self._has_named_kw_arg or self._has_var_kw_arg: if request.method == 'POST': #判斷客戶端發來的方法是否爲POST if not request.content_type: #查詢有沒提交數據的格式(EncType) return web.HTTPBadRequest(text='Missing Content_Type.')#這裏被廖大坑了,要有text ct = request.content_type.lower() #小寫 if ct.startswith('application/json'): #startswith params = await request.json() #Read request body decoded as json. if not isinstance(params,dict): return web.HTTPBadRequest(text='JSON body must be object.') kw = params elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'): params = await request.post() # reads POST parameters from request body.If method is not POST, PUT, PATCH, TRACE or DELETE or content_type is not empty or application/x-www-form-urlencoded or multipart/form-data returns empty multidict. kw = dict(**params) else: return web.HTTPBadRequest(text='Unsupported Content_Tpye: %s'%(request.content_type)) if request.method == 'GET': qs = request.query_string #The query string in the URL if qs: kw = dict() for k,v in parse.parse_qs(qs,True).items(): #Parse a query string given as a string argument.Data are returned as a dictionary. The dictionary keys are the unique query variable names and the values are lists of values for each name. kw[k] = v[0] if kw is None: kw = dict(**request.match_info) else: if not self._has_var_kw_arg and self._named_kw_args: #當函數參數沒有關鍵字參數時,移去request除命名關鍵字參數全部的參數信息 copy = dict() for name in self._named_kw_args: if name in kw: copy[name] = kw[name] kw = copy for k,v in request.match_info.items(): #檢查命名關鍵參數 if k in kw: logging.warning('Duplicate arg name in named arg and kw args: %s' % k) kw[k] = v if self._has_request_arg: kw['request'] = request if self._required_kw_args: #假如命名關鍵字參數(沒有附加默認值),request沒有提供相應的數值,報錯 for name in self._required_kw_args: if name not in kw: return web.HTTPBadRequest(text='Missing argument: %s'%(name)) logging.info('call with args: %s' % str(kw)) try: r = await self._fn(**kw) return r except APIError as e: #APIError另外建立 return dict(error=e.error, data=e.data, message=e.message)
# 註冊 url 對應處理函數 至關於 spring中的 @RequestMapping 註解 import inspect,asyncio #編寫一個add_route函數,用來註冊一個URL處理函數 def add_route(app,fn): method = getattr(fn,'__method__',None) path = getattr(fn,'__route__',None) if method is None or path is None: return ValueError('@get or @post not defined in %s.'%str(fn)) if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): #判斷是否爲協程且生成器,不是使用isinstance fn = asyncio.coroutine(fn) logging.info('add route %s %s => %s(%s)'%(method,path,fn.__name__,','.join(inspect.signature(fn).parameters.keys()))) app.router.add_route(method,path,RequestHandler(app,fn))#別忘了RequestHandler的參數有兩個
# 上面註冊若是 一個一個 弄 豈不是會煩死,下面來一個 模塊導入,批量註冊 #直接導入文件,批量註冊一個URL處理函數 def add_routes(app,module_name): n = module_name.rfind('.') if n == -1: mod = __import__(module_name,globals(),locals()) else: name = module_name[n+1:] mod = getattr(__import__(module_name[:n],globals(),locals(),[name],0),name)#第一個參數爲文件路徑參數,不能摻夾函數名,類名 for attr in dir(mod): if attr.startswith('_'): continue fn = getattr(mod,attr) if callable(fn): method = getattr(fn,'__method__',None) path = getattr(fn,'__route__',None) if path and method: #這裏要查詢path以及method是否存在而不是等待add_route函數查詢,由於那裏錯誤就要報錯了
### 說到底 仍是要 順着 aiohttp 提供的 接口來,給他 構造出他要的參數 import os,logging #添加靜態文件夾的路徑 def add_static(add): path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'static')#輸出當前文件夾中'static'的路徑 app.router.add_static('/static/',path)#prefix (str) – URL path prefix for handled static files logging.info('add static %s => %s'%('/static/',path))
from jinja2 import Environment, FileSystemLoader from datetime import datetime import json, time import logging #初始化jinja2,以便其餘函數使用jinja2模板 def init_jinja2(app, **kw): logging.info('init jinja2...') options = dict( autoescape = kw.get('autoescape', True), block_start_string = kw.get('block_start_string', '{%'), block_end_string = kw.get('block_end_string', '%}'), variable_start_string = kw.get('variable_start_string', '{{'), variable_end_string = kw.get('variable_end_string', '}}'), auto_reload = kw.get('auto_reload', True) ) path = kw.get('path', None) if path is None: path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates') logging.info('set jinja2 template path: %s' % path) env = Environment(loader=FileSystemLoader(path), **options) filters = kw.get('filters', None) if filters is not None: for name, f in filters.items(): env.filters[name] = f app['__templating__'] = env def datetime_filter(t): delta = int(time.time() - t) if delta < 60: return u'1分鐘前' if delta < 3600: return u'%s分鐘前' % (delta // 60) if delta < 86400: return u'%s小時前' % (delta // 3600) if delta < 604800: return u'%s天前' % (delta // 86400) dt = datetime.fromtimestamp(t) return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)