使用原生 python 造輪子搭建博客

這篇用來 記錄一個 從零開始的 博客搭建,但願堅持下去,由於python 開發效率使人髮指,因此會原生從零寫 ORM ,Web 框架

前提是打好 異步 io 的基礎, 使用異步,有一點要謹記,一旦開始異步,層層異步,從 http 到 數據庫層都要用異步框架寫異步函數,所謂開弓沒有回頭箭

# -*- coding: utf-8 -*-
import asyncio
import time
from functools import wraps
__author__ = 'Frank Li'


def time_count(func):
    @wraps(func)
    def inner_func(*args,**kw):
        start = time.time()
        result = func(*args,**kw)
        end = time.time()
        print('{} cost {:.1f} s...'.format(func.__name__,end-start))
        return result
    return inner_func


async def do_some_work(n):
    await asyncio.sleep(n)

@asyncio.coroutine
def task_io_01():
    print('{} start to run ...'.format(task_io_01.__name__))
    n = 3
    yield from do_some_work(n)
    print('{} continue to work {} seconds later...'.format(task_io_01.__name__,n))
    return task_io_01.__name__

@asyncio.coroutine
def task_io_02():
    print('{} start to run ...'.format(task_io_02.__name__))
    n = 5
    yield from do_some_work(n)
    print('{} continue to do the work in {} seconds'.format(task_io_02.__name__,n))
    return task_io_02.__name__

@time_count
def main():
    tasks = [task_io_02(),task_io_01()]
    loop = asyncio.get_event_loop()
    done, pending = loop.run_until_complete(asyncio.wait(tasks))
    for d in done:
        print('協程無序返回值:{}'.format(d.result()))
    loop.close()

if __name__ == '__main__':
    main()

環境準備 flask 都不要,驚掉下巴

python 3.7

pip install aiohttp

pip install jinja2

pip install aiomysql

pycharm( python 開發 IDE)  + git bash(練習命令) + github(遠程倉庫)

在 pycharm 裏 新建 project
並 cd 進入 project 目錄html

  1. 執行 git init 初始化本地倉庫
  2. 登陸 github 建立本身的遠程倉庫,由於沒給錢是public 的倉庫,放心,這點兒代碼沒人偷,並且python開發者首先信奉開源
  3. 執行命令 git remote add origin git@github.com:FrankLi99/awesome-python3-webapp.git 關聯本地與遠程倉庫
  4. 建立以下圖所示的 工程目錄
  5. 添加 .gitignore 參考
  6. 執行 git add . , git commit -m "init commit" 添加到暫存區,並提交到本地倉庫的 master 分支,由於就我一我的開發,其實 master + dev 兩個分支就已足夠python

  7. 執行 git push -u origin master 推送本地倉庫 master 分支 到遠程 master 分支, -u 參數 還能夠將 兩個 master 分支 關聯起來,下次拉取推送 就直接 git pull/push origin mastermysql

上面這麼麻煩,徹底能夠使用 git clone repo-addr 來解決,只是 爲了練習下另外一種方法

day2 接下來 ,添加第一個 webserver --》 app.py

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

import logging; logging.basicConfig(level=logging.INFO)

import asyncio, os, json, time
from datetime import datetime

from aiohttp import web

def index(request):
    return web.Response(body=b'<h1>Awesome</h1>',content_type='text/html')

@asyncio.coroutine
def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 9000)
    logging.info('server started at http://127.0.0.1:9000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

自此,咱們 Web 骨架搭好了,能夠進行深刻開發了。。。

  1. 添加並提交到本地倉庫 git add www/app.py
  2. 推送到遠程倉庫 git push origin master

day03 編寫 orm 框架 ==> 使用 元類 + mysql 異步庫 aiomysql 編寫 orm 框架,看起來複雜,不過總要爲之。

# 第一次寫這麼複雜的代碼,我建議先把主要的 結構搭建出來,而後在熟讀源碼的基礎上,再來給每一個pass部分作替換(這部分真的很綜合,我認爲是進階)

import asyncio
import logging;logging.basicConfig(level=logging.info)
import aiomysql

def log(sql,args):
    logging.info('SQL: {sql} , other args: {args}'.format(sql=sql,args=args))

@asyncio.coroutine
def create_pool(loop,**db_info):
    logging.info('start to create aiomysql database connection pool...')
    global __pool
    __pool = yield from aiomysql.create_pool(host=db_info.get('host','localhost'),
                                             port=db_info.get('port',3306),
                                             db=db_info.get('db'),
                                             user=db_info.get('user'),
                                             password=db_info.get('password'),
                                             charset=db_info.get('charset','utf-8'),
                                             autocommit=db_info.get('autocommit',True),
                                             minsize=db_info.get('minsize',1),
                                             maxsize=db_info.get('maxsize',10),
                                             loop=loop)

@asyncio.coroutine
def select(sql,args,size=None):
    log(sql,args)
    global __pool
    with (yield from __pool) as conn:
        csr = yield from conn.cursor(aiomysql.DictCursor)
        yield from csr.execute(sql.replace('?','%s'),args or ())
        rs = csr.fetchmany(size) if size else csr.fetchall()
        return rs

@asyncio.coroutine
def execute(sql,args,autocommit=True):
    log(sql,args)
    global __pool
    with (yield from __pool) as conn:
        if not autocommit:
            yield from conn.begin()
        try:
            csr = yield from conn.cursor()
            yield from csr.execute(sql.replace('?','%s'),args or ())
            affectedRow = csr.rowcount
            if not autocommit:
                yield from conn.commit()
        except BaseException as e:
            raise
            if not autocommit:
                yield from conn.rollback()
        finally:
            yield from csr.close()
        return affectedRow
    
# 根據長度構造 佔位符
def create_args_string(num):
    return ','.join('?'*num)

# 定義 Field 字段類
class Field(object):
    pass
class IntegerField(Field):
    pass
class StringField(Field):
    pass
class FloatField(Field):
    pass
class BooleanField(Field):
    pass
class TextField(Field):
    pass

# 定義實體類的元類,先有類來後有天,元類更在類以前
class ModelMetaclass(type):
    pass

class Model(dict,metaclass=ModelMetaclass):
    pass

完善版 orm

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'

import asyncio
import logging;logging.basicConfig(level=logging.DEBUG)
import aiomysql

def log(sql,args):
    logging.info('SQL:{sql} , other ARGS:{args}'.format(sql=sql,args=args))

# 建立數據庫鏈接池 -- 全局
@asyncio.coroutine
def create_pool(loop,**db_info):
    logging.info('start to create global database connection pool...')
    global __pool
    __pool = yield from aiomysql.create_pool(host=db_info.get('host','localhost'),
                                            port=db_info.get('port',3306),
                                            db=db_info.get('db'),
                                            user=db_info.get('user'),
                                            password=db_info.get('password'),
                                            charset=db_info.get('charset','utf8'),
                                            autocommit=db_info.get('autocommit',True),
                                            minsize=db_info.get('minsize',1),
                                            maxsize=db_info.get('maxsize',10),
                                            loop=loop)

@asyncio.coroutine
def select(sql,args,size=None):
    log(sql,args)
    global __pool
    with (yield from __pool) as conn:
        csr = yield from conn.cursor(aiomysql.DictCursor)
        yield from csr.execute(sql.replace('?','%s'),args or ())
        rs = csr.fetchmany(size) if size else csr.fetchall()
        yield from csr.close()
        # logging.info('result rowcount: {}'.format(len(rs)))
        return rs

@asyncio.coroutine
def execute(sql,args,autocommit=True):
    log(sql,args)
    global __pool
    with (yield from __pool) as conn:
        if not autocommit:
            yield from conn.begin()
        try:
            csr = yield from conn.cursor()
            yield from csr.execute(sql.replace('?','%s'),args or ())
            affctedRow = csr.rowcount
            logging.info('execute sql affcted row: {}'.format(affctedRow))
            if not autocommit:
                yield from conn.commit()

        except BaseException:
            if not autocommit:
                yield from conn.rollback()
            raise
        finally:
            try:
                yield from csr.close()
            except BaseException as e:
                logging.info(e)
        return affctedRow

# 開始定義 字段類 Field
class Field(object):
    def __init__(self,name,column_type,is_pk,default):
        self.name = name
        self.column_type = column_type
        self.is_pk = is_pk
        self.default = default
    def __repr__(self):
        return '<{},{}:{}>'.format(self.__class__.__name__,self.column_type,self.name)
    __str__ = __repr__

class IntegerField(Field):
    def __init__(self,name=None,column_type='bigint',is_pk=False,default=0):
        super(IntegerField,self).__init__(name,column_type,is_pk,default)

class StringField(Field):
    def __init__(self,name=None,column_type='varchar(256)',is_pk=False,default=None):
        super(StringField,self).__init__(name,column_type,is_pk,default)

class FloatField(Field):
    def __init__(self,name=None,column_type='real',is_pk=False,default=0.0):
        super(FloatField,self).__init__(name,column_type,is_pk,default)

class BooleanField(Field):
    def __init__(self,name=None,column_type='boolean',default=False):
        super(BooleanField,self).__init__(name,column_type,False,default)

class TextField(Field):
    def __init__(self,name=None,column_type='text',default=None):
        super(TextField,self).__init__(name,column_type,False,default)

# 根據參數個數構造 ? 佔位符
def create_args_string(num):
    return ','.join('?'*num)

# 開始寫 元類 ModelMetaclass  主要做用就是 對 全部 Model 構造 select ,insert,delete,update 語句
# 先有類來後有天,元類更在類以前
class ModelMetaclass(type):
    def __new__(cls,name,bases,attrs):
        # 排除掉 Model 這個 父類自己
        if name=='Model':
            return type.__new__(cls,name,bases,attrs)

        tb_name = attrs.get('__table__',str.lower(name))

        # 若是是其子類
        mappings = {}
        fields = []
        primaryKey = None

        for k,v in attrs.items():

            if isinstance(v,Field):
                logging.info('found field mapping {} <==> {}'.format(k, v))
                mappings[k] = v
                if v.is_pk:
                    if primaryKey:
                        raise RuntimeError('Duplicated primary key for field: {}'.format(k))
                    primaryKey = k
                else:
                    fields.append(k)

        if not primaryKey:
            raise RuntimeError('Primary key not found...')

        # 去除 類中 與 實例中 變量同名的 類變量
        for k in mappings.keys():
            attrs.pop(k)

        escape_fields = list(map(lambda f: '`{}`'.format(f),fields))

        # 將獲取到的值 放入 類屬性中
        attrs['__mappings__'] = mappings
        attrs['__table__'] = tb_name
        attrs['__fields__'] = fields
        attrs['__primary_key__'] = primaryKey

        # 構造 select , update ,delete ,insert 語句
        attrs['__select__'] = 'select `{primaryKey}`,{escape_fields} from `{tb_name}` '.format(primaryKey=primaryKey,escape_fields=','.join(escape_fields),tb_name=tb_name)
        attrs['__insert__'] = 'insert into `{tb_name}`({escape_fields},`{primaryKey}`) values({args})'.format(tb_name=tb_name,escape_fields=','.join(escape_fields),primaryKey=primaryKey,args=create_args_string(len(fields)+1))
        attrs['__delete__'] = 'delete from `{tb_name}` where `{primaryKey}`=?'.format(tb_name=tb_name,primaryKey=primaryKey)
        attrs['__update__'] = 'update `tb_name` set {set_cols} where `{primaryKey}`=?'.format(tb_name=tb_name,set_cols=','.join(list(map(lambda f:'`{}`=?'.format(mappings.get(f).name or f),fields))),primaryKey=primaryKey)

        # 返回 改造好的 類模板
        return type.__new__(cls,name,bases,attrs)

# 造實體類的 模板
class Model(dict,metaclass=ModelMetaclass):

    def __init__(self,*args,**kw):
        super(Model,self).__init__(*args,**kw)

    # 方便 dict 對象 如同 屬性通常調用 dict.name , dict.id 等等
    def __getattr__(self,key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Model' object has no attribute {}".format(key))
    def __setattr__(self,key,value):
        self[key] = value

    def getValue(self,key):
        return getattr(self,key,None)

    def getValueOrDefault(self,key):
        value = self.getValue(key)
        if value is None:
            field = self.__mappings__[key]
            if field is not None:
                value = field.default()  if callable(field.default) else field.default
                logging.info('using default for {} : {}'.format(key,str(value)))
                setattr(self,key,value)
        return value

    @classmethod
    @asyncio.coroutine
    def findAll(cls,where=None,args=None,**kw):
        'find object by where clause'
        select_sql = [cls.__select__]
        if where:
            select_sql.append('where')
            select_sql.append(where)

        if args is None:
            args=[]

        orderBy = kw.get('orderbBy',None)
        if orderBy:
            select_sql.append('order by ')
            select_sql.append(orderBy)

        limit = kw.get('limit',None)
        if limit:
            select_sql.append('limit')
            if isinstance(limit,int):
                select_sql.append('?')
                args.append(limit)
            elif isinstance(limit,tuple):

                select_sql.append(create_args_string(2)) #  ?,?
                args.extend(limit)
            else:
                raise ValueError('Invalid limit value: {}'.format(str(limit)))

        rs = yield from select(' '.join(select_sql),args)
        return [cls(**r) for r in rs]

    @classmethod
    @asyncio.coroutine
    def findNumber(cls,selectField,where=None,args=None):
        ''' find number by select and where'''
        select_sql = ['select {selectField} _num_ from `tb_name`'.format(selectField=selectField,tb_name=cls.__table__)]
        if where:
            select_sql.append('where')
            select_sql.append(where)
        rs = yield from select(' '.join(select_sql),args,1)
        if len(rs) ==0:
            return None
        return rs[0]['_num_']

    @classmethod
    @asyncio.coroutine
    def find(cls,pk):
        '''find object by primary key'''
        rs = yield from select('{select_sql} where `{pk_field}`=?'.format(select_sql=cls.__select__,pk_field=cls.__primary_key__),[pk],size=1)
        return None if len(rs)==0 else cls(**rs[0])


    # save
    @asyncio.coroutine
    def save(self):
        args = list(map(self.getValueOrDefault,self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        rows = yield from execute(self.__insert__,args,True)
        if rows != 1:
            logging.warn('failed to insert record: affcted rows: {rowCount}'.format(rowCount=rows))
        else:
            logging.warn('affcted rows: {rowCount}'.format(rowCount=rows))
    # update
    @asyncio.coroutine
    def update(self):
        args = list(map(self.getValueOrDefault,self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        rows = yield from execute(self.__update__,args)
        if rows !=1:
            logging.warn('failed to update by primary key: affected {rowCount}'.format(rowCount=rows))

    # delete
    @asyncio.coroutine
    def remove(self):
        args = list(map(self.getValueOrDefault,self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        rows = yield from execute(self.__delete__,args)
        if rows != 1:
            logging.info('failed to delete by primary key affected rows: {rowCount}'.format(rowCount=rows))

model 模塊

# -*- coding: utf-8 -*-
__author__ = 'Frank Li'
from www.orm2 import Model,StringField,IntegerField,BooleanField,FloatField,create_pool
import logging;logging.basicConfig(level=logging.DEBUG)
import time
from www.models import User
import asyncio
import uuid

def next_id():
    return '%015d%s000' % (int(time.time() * 1000), uuid.uuid4().hex)

class User(Model):
    __table__ = 'users'

    id = StringField(is_pk=True, default=next_id, column_type='varchar(50)')
    email = StringField(column_type='varchar(50)')
    passwd = StringField(column_type='varchar(50)')
    admin = BooleanField()
    name = StringField(column_type='varchar(50)')
    image = StringField(column_type='varchar(500)')
    created_at = FloatField(default=time.time)


@asyncio.coroutine
def test(loop):
    yield from  create_pool(loop=loop,user='www-data', password='www-data', db='awesome')
    u = User(name='Frank', email='Frank@example.com', passwd='1234567890', image='about:blank')
    yield from u.save()
    logging.info(u.__select__)
    logging.info(u.__insert__)
    logging.info(u.__delete__)
    logging.info(u.__update__)

    users = yield from User.findAll()
    for user in users:
        logging.info('user info:\n  '+str(user))
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test(loop))
    loop.run_forever()

編寫 異步 web 框架, 這裏真的好難。。。比 ORM 還難不少,初次看來是這樣,可能須要 細細鑽研源碼。。。

遍尋網絡終於發現註釋版的代碼
這篇博客不錯,終仍是本身見識太淺薄git

# 首先來個最簡單的吧, 構造 兩個 裝飾器用來對應 前臺頁面傳來的 兩種請求 ,get / post
from functools import wraps,partial

def handler_decorator(path,*,method='GET'):
    def decorator(func):
        @wraps(func) # 更正函數簽名
        def wrapper(*args,**kw):
            result = func(*args,**kw)
            return result
        wrapper.__method__ = method
        wrapper.__urlpath__ = path
        return wrapper
    return decorator

get=partial(handler_decorator,method='GET')
post=partial(handler_decorator,method='POST')

# 使用時候 就方便了
@get('/home/index')
def home_index(request):
    pass

### 定義 RequestHandler 用來解析 request 中的 信息

import inspect,asyncio
from web_app.APIError import APIError
from aiohttp import web
from urllib import parse

#運用inspect模塊,建立幾個函數用以獲取URL處理函數與request參數之間的關係
def get_required_kw_args(fn): #收集沒有默認值的命名關鍵字參數
    args = []
    params = inspect.signature(fn).parameters #inspect模塊是用來分析模塊,函數
    for name, param in params.items():
        if str(param.kind) == 'KEYWORD_ONLY' and param.default == inspect.Parameter.empty:
            args.append(name)
    return tuple(args)

def get_named_kw_args(fn):  #獲取命名關鍵字參數
    args = []
    params = inspect.signature(fn).parameters
    for name,param in params.items():
        if str(param.kind) == 'KEYWORD_ONLY':
            args.append(name)
    return tuple(args)

def has_named_kw_arg(fn): #判斷有沒有命名關鍵字參數
    params = inspect.signature(fn).parameters
    for name,param in params.items():
        if str(param.kind) == 'KEYWORD_ONLY':
            return True

def has_var_kw_arg(fn): #判斷有沒有關鍵字參數
    params = inspect.signature(fn).parameters
    for name,param in params.items():
        if str(param.kind) == 'VAR_KEYWORD':
            return True

def has_request_arg(fn): #判斷是否含有名叫'request'參數,且該參數是否爲最後一個參數
    params = inspect.signature(fn).parameters
    sig = inspect.signature(fn)
    found = False
    for name,param in params.items():
        if name == 'request':
            found = True
            continue #跳出當前循環,進入下一個循環
        if found and (str(param.kind) != 'VAR_POSITIONAL' and str(param.kind) != 'KEYWORD_ONLY' and str(param.kind != 'VAR_KEYWORD')):
            raise ValueError('request parameter must be the last named parameter in function: %s%s'%(fn.__name__,str(sig)))
    return found

#定義RequestHandler,正式向request參數獲取URL處理函數所需的參數,發現 請求信息有問題的 raise APIError ,沒有問題 則放行 ,注意全部這些 fn 都是指的實際 handler.py 中的函數

class RequestHandler(object):

    def __init__(self,app,fn):#接受app參數
        self._app = app
        self._fn = fn
        self._required_kw_args = get_required_kw_args(fn)
        self._named_kw_args = get_named_kw_args(fn)
        self._has_named_kw_arg = has_named_kw_arg(fn)
        self._has_var_kw_arg = has_var_kw_arg(fn)
        self._has_request_arg = has_request_arg(fn)

    async def __call__(self,request): #__call__這裏要構造協程
        kw = None
        if self._has_named_kw_arg or self._has_var_kw_arg:
            if request.method == 'POST': #判斷客戶端發來的方法是否爲POST
                if not request.content_type: #查詢有沒提交數據的格式(EncType)
                    return web.HTTPBadRequest(text='Missing Content_Type.')#這裏被廖大坑了,要有text
                ct = request.content_type.lower() #小寫
                if ct.startswith('application/json'): #startswith
                    params = await request.json() #Read request body decoded as json.
                    if not isinstance(params,dict):
                        return web.HTTPBadRequest(text='JSON body must be object.')
                    kw = params
                elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
                    params = await request.post() # reads POST parameters from request body.If method is not POST, PUT, PATCH, TRACE or DELETE or content_type is not empty or application/x-www-form-urlencoded or multipart/form-data returns empty multidict.
                    kw = dict(**params)
                else:
                    return web.HTTPBadRequest(text='Unsupported Content_Tpye: %s'%(request.content_type))
            if request.method == 'GET': 
                qs = request.query_string #The query string in the URL
                if qs:
                    kw = dict()
                    for k,v in parse.parse_qs(qs,True).items(): #Parse a query string given as a string argument.Data are returned as a dictionary. The dictionary keys are the unique query variable names and the values are lists of values for each name.
                        kw[k] = v[0]
        if kw is None:
            kw = dict(**request.match_info)
        else:
            if not self._has_var_kw_arg and self._named_kw_args: #當函數參數沒有關鍵字參數時,移去request除命名關鍵字參數全部的參數信息
                copy = dict()
                for name in self._named_kw_args:
                    if name in kw:
                        copy[name] = kw[name]
                kw = copy
            for k,v in request.match_info.items(): #檢查命名關鍵參數
                if k in kw:
                    logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
                kw[k] = v
        if self._has_request_arg:
            kw['request'] = request
        if self._required_kw_args: #假如命名關鍵字參數(沒有附加默認值),request沒有提供相應的數值,報錯
            for name in self._required_kw_args:
                if name not in kw:
                    return web.HTTPBadRequest(text='Missing argument: %s'%(name))
        logging.info('call with args: %s' % str(kw))

        try:
            r = await self._fn(**kw)
            return r
        except APIError as e: #APIError另外建立
            return dict(error=e.error, data=e.data, message=e.message)
# 註冊 url  對應處理函數 至關於 spring中的  @RequestMapping  註解

import inspect,asyncio

#編寫一個add_route函數,用來註冊一個URL處理函數
def add_route(app,fn):
    method = getattr(fn,'__method__',None)
    path = getattr(fn,'__route__',None)
    if method is None or path is None:
        return ValueError('@get or @post not defined in %s.'%str(fn))
    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn): #判斷是否爲協程且生成器,不是使用isinstance
        fn = asyncio.coroutine(fn)
    logging.info('add route %s %s => %s(%s)'%(method,path,fn.__name__,','.join(inspect.signature(fn).parameters.keys())))
    app.router.add_route(method,path,RequestHandler(app,fn))#別忘了RequestHandler的參數有兩個

# 上面註冊若是 一個一個 弄 豈不是會煩死,下面來一個 模塊導入,批量註冊
#直接導入文件,批量註冊一個URL處理函數
def add_routes(app,module_name):
    n = module_name.rfind('.')
    if n == -1:
        mod = __import__(module_name,globals(),locals())
    else:
        name = module_name[n+1:]
        mod = getattr(__import__(module_name[:n],globals(),locals(),[name],0),name)#第一個參數爲文件路徑參數,不能摻夾函數名,類名
    for attr in dir(mod):
        if attr.startswith('_'):
            continue
        fn = getattr(mod,attr)
        if callable(fn): 
            method = getattr(fn,'__method__',None) 
            path = getattr(fn,'__route__',None)
            if path and method: #這裏要查詢path以及method是否存在而不是等待add_route函數查詢,由於那裏錯誤就要報錯了
### 說到底 仍是要 順着 aiohttp 提供的 接口來,給他 構造出他要的參數 
import os,logging

#添加靜態文件夾的路徑
def add_static(add):
    path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'static')#輸出當前文件夾中'static'的路徑
    app.router.add_static('/static/',path)#prefix (str) – URL path prefix for handled static files
    logging.info('add static %s => %s'%('/static/',path))
from jinja2 import Environment, FileSystemLoader
from datetime import datetime
import json, time
import logging


#初始化jinja2,以便其餘函數使用jinja2模板
def init_jinja2(app, **kw):
    logging.info('init jinja2...')
    options = dict(
        autoescape = kw.get('autoescape', True),
        block_start_string = kw.get('block_start_string', '{%'),
        block_end_string = kw.get('block_end_string', '%}'),
        variable_start_string = kw.get('variable_start_string', '{{'),
        variable_end_string = kw.get('variable_end_string', '}}'),
        auto_reload = kw.get('auto_reload', True)
    )
    path = kw.get('path', None)
    if path is None:
        path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')
    logging.info('set jinja2 template path: %s' % path)
    env = Environment(loader=FileSystemLoader(path), **options)
    filters = kw.get('filters', None)
    if filters is not None:
        for name, f in filters.items():
            env.filters[name] = f
    app['__templating__'] = env

def datetime_filter(t):
    delta = int(time.time() - t)
    if delta < 60:
        return u'1分鐘前'
    if delta < 3600:
        return u'%s分鐘前' % (delta // 60)
    if delta < 86400:
        return u'%s小時前' % (delta // 3600)
    if delta < 604800:
        return u'%s天前' % (delta // 86400)
    dt = datetime.fromtimestamp(t)
    return u'%s年%s月%s日' % (dt.year, dt.month, dt.day)
相關文章
相關標籤/搜索