wsgi自帶,用語構建簡單服務器
from wsgiref.simple_server import make_server
def index(env, res):
res('200 ok', [('Content-Type', 'text/html')])
print(env['PATH_INFO'][1:]) # method = env['REQUEST_METHOD']
body = '<h1>hello %s </h1>' % (env['PATH_INFO'][1:] or 'index')
return [body.encode('utf8')]
# 建立服務器
server = make_server('127.0.0.1', 3000, index)
print('Server start at 3000...')
# 監聽請求
server.serve_forever()
aiohttp基於asyncio實現的HTTP框架
import asyncio
from aiohttp import web
@asyncio.coroutine
def index(request):
yield from asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % (request.match_info['name'] or 'index')
return web.Response(body=text.encode('utf8'))
@asyncio.coroutine
def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/{name}', index)
# 建立服務器
srv = yield from loop.create_server(app.make_handler(), '127.0.0.1', 3000)
print('Server started at http://127.0.0.1:3000...')
return srv
# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(init(loop))
# 監聽請求
loop.run_forever()
from jinja2 import Environment, FileSystemLoader
def init_jinja2():
path = os.path.abspath('.')
env = Environment(loader=FileSystemLoader(path))
return env
__template__ = init_jinja2()
....
body = __template__.get_template('example.html').render(**kw).encode('utf-8')
....
aiomysql 爲asyncio提供異步mmysql IO的驅動
import asyncio
import aiomysql
loop = None
@asyncio.coroutine
def connect():
global loop
# 創建鏈接
conn = yield from aiomysql.connect(
host= '127.0.0.1',
port= 3306,
user= 'root',
password= 'password',
db= 'awesome',
loop= loop)
# 打開遊標
cur = yield from conn.cursor()
# 執行SQL語句
yield from cur.execute('SELECT * FROM users')
print(cur.description)
# 獲取結果集
r = yield from cur.fetchall()
print(r)
yield from cur.close()
conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())