使用python作web開發面臨的一個最大的問題就是性能,在解決C10K問題上顯的有點吃力。有些異步框架Tornado、Twisted、Gevent 等就是爲了解決性能問題。這些框架在性能上有些提高,可是也出現了各類古怪的問題難以解決。html
在python3.6中,官方的異步協程庫asyncio正式成爲標準。在保留便捷性的同時對性能有了很大的提高,已經出現許多的異步框架使用asyncio。node
使用較早的異步框架是aiohttp,它提供了server端和client端,對asyncio作了很好的封裝。可是開發方式和最流行的微框架flask不一樣,flask開發簡單,輕量,高效。python
微服務是最近最火開發模式,它解決了複雜性問題,提升開發效率,便於部署等優勢。git
正是結合這些優勢, 以Sanic爲基礎,集成多個流行的庫來搭建微服務。 Sanic框架是和Flask類似的異步協程框架,簡單輕量,而且性能很高。github
本項目就是以Sanic爲基礎搭建的微服務框架。web
項目地址: sanic-mssql
Example數據庫
使用sanic異步框架,有較高的性能,可是使用不當會形成blocking, 對於有IO請求的都要選用異步庫。添加庫要慎重。 sanic使用uvloop異步驅動,uvloop基於libuv使用Cython編寫,性能比nodejs還要高。express
功能說明:json
@app.listener('before_server_start')
async def before_srver_start(app, loop):
queue = asyncio.Queue()
app.queue = queue
loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
reporter = AioReporter(queue=queue)
tracer = BasicTracer(recorder=reporter)
tracer.register_required_propagators()
opentracing.tracer = tracer
app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
複製代碼
@app.middleware('request')
async def cros(request):
if request.method == 'POST' or request.method == 'PUT':
request['data'] = request.json
span = before_request(request)
request['span'] = span
@app.middleware('response')
async def cors_res(request, response):
span = request['span'] if 'span' in request else None
if response is None:
return response
result = {'code': 0}
if not isinstance(response, HTTPResponse):
if isinstance(response, tuple) and len(response) == 2:
result.update({
'data': response[0],
'pagination': response[1]
})
else:
result.update({'data': response})
response = json(result)
if span:
span.set_tag('http.status_code', "200")
if span:
span.set_tag('component', request.app.name)
span.finish()
return response
複製代碼
對拋出的異常進行處理,返回統一格式
建立task消費queue中對span,用於日誌追蹤
因爲使用的是異步框架,能夠將一些IO請求並行處理
Example:
async def async_request(datas):
# async handler request
results = await asyncio.gather(*[data[2] for data in datas])
for index, obj in enumerate(results):
data = datas[index]
data[0][data[1]] = results[index]
@user_bp.get('/<id:int>')
@doc.summary("get user info")
@doc.description("get user info by id")
@doc.produces(Users)
async def get_users_list(request, id):
async with request.app.db.acquire(request) as cur:
record = await cur.fetch(
""" SELECT * FROM users WHERE id = $1 """, id)
datas = [
[record, 'city_id', get_city_by_id(request, record['city_id'])]
[record, 'role_id', get_role_by_id(request, record['role_id'])]
]
await async_request(datas)
return record
複製代碼
get_city_by_id, get_role_by_id是並行處理。
Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。
ORM使用peewee, 只是用來作模型設計和migration, 數據庫操做使用asyncpg。
Example:
# models.py
class Users(Model):
id = PrimaryKeyField()
create_time = DateTimeField(verbose_name='create time',
default=datetime.datetime.utcnow)
name = CharField(max_length=128, verbose_name="user's name")
age = IntegerField(null=False, verbose_name="user's age")
sex = CharField(max_length=32, verbose_name="user's sex")
city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)
class Meta:
db_table = 'users'
# migrations.py
from sanic_ms.migrations import MigrationModel, info, db
class UserMigration(MigrationModel):
_model = Users
# @info(version="v1")
# def migrate_v1(self):
# migrate(self.add_column('sex'))
def migrations():
try:
um = UserMigration()
with db.transaction():
um.auto_migrate()
print("Success Migration")
except Exception as e:
raise e
if __name__ == '__main__':
migrations()
複製代碼
asyncpg is the fastest driver among common Python, NodeJS and Go implementations
使用asyncpg爲數據庫驅動, 對數據庫鏈接進行封裝, 執行數據庫操做。
不使用ORM作數據庫操做,一個緣由是性能,ORM會有性能的損耗,而且沒法使用asyncpg高性能庫。另外一個是單個微服務是很簡單的,表結構不會很複雜,簡單的SQL語句就能夠處理來,不必引入ORM。使用peewee只是作模型設計
Example:
sql = "SELECT * FROM users WHERE name=$1"
name = "test"
async with request.app.db.acquire(request) as cur:
data = await cur.fetchrow(sql, name)
async with request.app.db.transaction(request) as cur:
data = await cur.fetchrow(sql, name)
複製代碼
使用aiohttp中的client,對客戶端進行了簡單的封裝,用於微服務之間訪問。
Don’t create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.
Example:
@app.listener('before_server_start')
async def before_srver_start(app, loop):
app.client = Client(loop, url='http://host:port')
async def get_role_by_id(request, id):
cli = request.app.client.cli(request)
async with cli.get('/cities/{}'.format(id)) as res:
return await res.json()
@app.listener('before_server_stop')
async def before_server_stop(app, loop):
app.client.close()
複製代碼
對於訪問不一樣的微服務能夠建立多個不一樣的client,這樣每一個client都會keep-alives
使用官方logging, 配置文件爲logging.yml, sanic版本要0.6.0及以上。JsonFormatter將日誌轉成json格式,用於輸入到ES
Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.
@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
async def get_city_by_id(request, id):
cli = request.app.client.cli(request)
複製代碼
api文檔使用swagger標準。
Example:
from sanic_ms import doc
@user_bp.post('/')
@doc.summary('create user')
@doc.description('create user info')
@doc.consumes(Users)
@doc.produces({'id': int})
async def create_user(request):
data = request['data']
async with request.app.db.transaction(request) as cur:
record = await cur.fetchrow(
""" INSERT INTO users(name, age, city_id, role_id) VALUES($1, $2, $3, $4, $5) RETURNING id """, data['name'], data['age'], data['city_id'], data['role_id']
)
return {'id': record['id']}
複製代碼
在返回時,不要返回sanic的response,直接返回原始數據,會在Middleware中對返回的數據進行處理,返回統一的格式,具體的格式能夠[查看]
單元測試使用unittest。 mock是本身建立了MockClient,由於unittest尚未asyncio的mock,而且sanic的測試接口也是發送request請求,因此比較麻煩. 後期可使用pytest。
Example:
from sanic_ms.tests import APITestCase
from server import app
class TestCase(APITestCase):
_app = app
_blueprint = 'visit'
def setUp(self):
super(TestCase, self).setUp()
self._mock.get('/cities/1',
payload={'id': 1, 'name': 'shanghai'})
self._mock.get('/roles/1',
payload={'id': 1, 'name': 'shanghai'})
def test_create_user(self):
data = {
'name': 'test',
'age': 2,
'city_id': 1,
'role_id': 1,
}
res = self.client.create_user(data=data)
body = ujson.loads(res.text)
self.assertEqual(res.status, 200)
複製代碼
coverage erase
coverage run --source . -m sanic_ms tests
coverage xml -o reports/coverage.xml
coverage2clover -i reports/coverage.xml -o reports/clover.xml
coverage html -d reports
複製代碼
使用 app.error_handler = CustomHander() 對拋出的異常進行處理
Example:
from sanic_ms.exception import ServerError
@visit_bp.delete('/users/<id:int>')
async def del_user(request, id):
raise ServerError(error='內部錯誤',code=10500, message="msg")
複製代碼