python tornado 實現類禪道系統

   最近樓主加班 嘍, 很久沒有更新個人博客了,哎,一言難盡,廢話我就不說了,來開始上精華。javascript

背景:目前市面上有不少bug管理工具,可是各有各的特色,最著名,最流行的就是禪道,一個偶然的機會接觸到了python ,學到tornado後,就想着去怎麼去用到實處,後來發現本身公司的除了禪道就記錄bug沒有什麼能夠用的工具了。前端

語言:python3 第三庫 :tornado,qiniu(用於雲存儲文件),數據庫用sqlitejava

why  use tornado?不少人其實會這麼問我,我感受tornado能夠實現異步,雖然如今代碼尚未用到異步,我感受仍是很不錯的框架,值得學習,如今不少公司都在用,我的感受這是一個不錯的,值得咱們你們去學習的框架。python

來看看個人需求文檔web

大題是這麼的輪廓,那麼拿到這個的時候,我會進行需求分析,sql

須要什麼樣的數據庫, 數據模型之間的關係,雖然如今仍是有不少地方是寫死的尚未進行搜索功能的設置,可是我相信,有了如今這個demo數據庫

那麼我開始來設計下我主要會結構flask

handlsers 存放相似flask的views後端

models存數據庫相關的,api

static存放靜態

template存放模板

untils存放公共庫

setting 配置文件

urls url地址映射

run 運行文件。

 那麼我來開始設計個人數據,其實個人數據模型也是一波三折的。

選擇用了sqlalchemy,以前用過flask的sqlalchemy感受不錯

from models.dataconfig import db_session,Base,create_all
from sqlalchemy import  Column,Integer,DateTime,Boolean,String,ForeignKey,desc,asc,Text
from sqlalchemy.orm import  relationship,backref
from untils.common import encrypt
import datetime
class User(Base):
	__tablename__='users'
	id=Column(Integer(),primary_key=True)
	username=Column(String(64),unique=True,index=True)
	email=Column(String(64))
	password=Column(String(64))
	last_logtime=Column(DateTime())
	status=Column(Integer())
	leves=Column(Integer())
	iphone=Column(Integer())
	Projects=relationship('Project',backref='users')
	shebei=relationship('Shebei',backref='users')
	file=relationship('FilePan',backref='users')
	banben=relationship('BanbenWrite',backref='users')
	testresult=relationship('TestResult',backref='users')
	testcase=relationship('TestCase',backref='users')
	buglog=relationship('BugLog',backref='users')
	def __repr__(self):
		return self.username
	@classmethod
	def get_by_id(cls, id):
		item = db_session.query(User).filter(User.id==id).first()
		return item
	@classmethod
	def get_by_username(cls, username):
		item = db_session.query(User).filter(User.username== username).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(Shebei).count()
	@classmethod
	def add_new(cls,username,password,iphone,email,leves):
		new=User(username=username,iphone=iphone,email=email,leves=leves)
		new.password=encrypt(password)
		new.status=0
		db_session.add(new)
		try:
			db_session.commit()
		except:
			db_session.rollback()
class Shebei(Base):
	__tablename__='shebeis'
	id=Column(Integer(),primary_key=True)
	shebei_id=Column(String(32),unique=True)
	shebei_name=Column(String(64))
	shebei_xitong=Column(String(64))
	shebei_xinghao=Column(String(255))
	shebei_jiage=Column(Integer())
	shebei_fapiaobianhao=Column(String(64))
	shebei_quanxian=Column(Boolean())
	shebei_jie=Column(Boolean())
	shebei_shuyu=Column(String())
	shebei_date=Column(DateTime())
	shebei_user=Column(String())
	gou_date=Column(DateTime())
	shebei_status=Column(String(16))
	she_sta=Column(Integer(),default=0)
	ruku_user=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.shebei_name
	@classmethod
	def get_by_name(cls,name):
		item=db_session.query(Shebei).filter(Shebei.shebei_name==name).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(Shebei).filter(Shebei.id==id).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(Shebei).count()
class TestResult(Base):
	__tablename__='testresults'
	id=Column(Integer(),primary_key=True)
	porject_id=Column(Integer(),ForeignKey('projects.id'))
	creat_time=Column(DateTime())
	bug_first=Column(Integer())
	ceshirenyuan=Column(String(255))
	is_send=Column(Boolean(),default=True)
	filepath=Column(String(64))
	status=Column(Integer(),default=0)
	user_id=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.porject_name
	@classmethod
	def get_by_name(cls,name):
		item=db_session.query(TestResult).filter(TestResult.porject_name==name).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(TestResult).filter(TestResult.id==id).first()
		return item
	@classmethod
	def get_by_user_id(cls,user_id):
		item=db_session.query(TestResult).filter(TestResult.user_id==user_id).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(TestResult).count()
class BanbenWrite(Base):
	__tablename__='banbens'
	id=Column(Integer(),primary_key=True)
	porject_id=Column(Integer(),ForeignKey('projects.id'))
	creat_time=Column(DateTime(),default=datetime.datetime.now())
	banbenhao=Column(String(32))
	is_xian=Column(Boolean(),default=False)
	is_test=Column(Boolean(),default=False)
	status=Column(Integer())
	user_id=Column(Integer(),ForeignKey('users.id'))
	bugadmin=relationship('BugAdmin',backref='banbens')
	def __repr__(self):
		return self.banbenhao
	@classmethod
	def get_by_name(cls,name):
		item=db_session.query(BanbenWrite).filter(BanbenWrite.porject_name==name).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(BanbenWrite).filter(BanbenWrite.id==id).first()
		return item
	@classmethod
	def get_by_user_id(cls,user_id):
		item=db_session.query(BanbenWrite).filter(BanbenWrite.user_id==user_id).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(BanbenWrite).count()
class FilePan(Base):
	__tablename__='files'
	id=Column(Integer(),primary_key=True)
	file_fenlei=Column(String(64))
	file_name=Column(String(64))
	down_count=Column(Integer(),default=0)
	creat_time=Column(DateTime(),default=datetime.datetime.now())
	status=Column(Integer(),default=0)
	down_url=Column(String(64))
	is_tui=Column(Boolean(),default=False)
	user_id=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.file_name
	@classmethod
	def get_by_file_name(cls,name):
		item=db_session.query(FilePan).filter(FilePan.file_name==name).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(FilePan).filter(FilePan.id==id).first()
		return item
	@classmethod
	def get_by_user_id(cls,user_id):
		item=db_session.query(FilePan).filter(FilePan.user_id==user_id).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(FilePan).count()
class BugAdmin(Base):
	__tablename__='bugadmins'
	id=Column(Integer(),primary_key=True)
	porject_id=Column(Integer(),ForeignKey('projects.id'))
	bugname=Column(String(64))
	bugdengji=Column(String(64))
	bugtime=Column(DateTime(),default=datetime.datetime.now())
	bug_miaoshu=Column(String(255))
	ban_id=Column(Integer(),ForeignKey('banbens.id'))
	fujian=Column(String(64))
	is_que=Column(Boolean())
	bug_status=Column(String(64))
	bug_jiejuefangan=Column(String(64))
	bug_send=Column(String(64))
	status=Column(Integer(),default=0)
	bug_log=relationship('BugLog',backref='bugadmins')
	user_id=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.bugname
	@classmethod
	def get_by_bugname(cls,bugname):
		item=db_session.query(BugAdmin).filter(BugAdmin.bugname==bugname).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(BugAdmin).filter(BugAdmin.id==id).first()
		return item
	@classmethod
	def get_by_porject_name(cls,porject_name):
		item=db_session.query(BugAdmin).filter(BugAdmin.porject_name==porject_name).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(BugAdmin).count()
class TestCase(Base):
	__tablename__='testcases'
	id=Column(Integer(),primary_key=True)
	porject_id=Column(Integer(),ForeignKey('projects.id'))
	casename=Column(String(64))
	case_qianzhi=Column(String())
	case_buzhou=Column(String())
	case_yuqi=Column(String())
	status=Column(Integer(),default=0)
	case_crea_time=Column(DateTime(),default=datetime.datetime.now())
	user_id=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.casename
	@classmethod
	def get_by_project_name(Cls,project_name):
		item=db_session.query(TestCase).filter(TestCase.project_name==project_name).first()
		return item
	@classmethod
	def get_by_casename(Cls,casename):
		item=db_session.query(TestCase).filter(TestCase.casename==casename).first()
		return item
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(TestCase).filter(TestCase.id==id).first()
		return item
	@classmethod
	def get_count(cls):
		return db_session.query(TestCase).count()
class BugLog(Base):
	__tablename__='buglogs'
	id=Column(Integer(),primary_key=True)
	bug_id=Column(Integer(),ForeignKey('bugadmins.id'))
	caozuo=Column(String())
	caozuo_time=Column(DateTime())
	user_id=Column(Integer(),ForeignKey('users.id'))
	def __repr__(self):
		return self.caozuo
	@classmethod
	def get_by_id(Cls,id):
		item=db_session.query(BugLog).filter(BugLog.id==id).first()
		return item
	@classmethod
	def get_by_user_id(Cls,user_id):
		item=db_session.query(BugLog).filter(BugLog.user_id==user_id).first()
		return item
	@classmethod
	def get_by_bug_id(Cls,bug_id):
		item=db_session.query(BugLog).filter(BugLog.bug_id==bug_id).first()
		return item
class Project(Base):
	__tablename__='projects'
	id=Column(Integer(),primary_key=True)
	name=Column(String(64))
	user_id=Column(Integer(),ForeignKey('users.id'))
	bug_log=relationship('BugAdmin',backref='projects')
	banben=relationship('BanbenWrite',backref='projects')
	testresult=relationship('TestResult',backref='projects')
	testcase=relationship('TestCase',backref='projects')
	def __repr__(self):
		return self.name
	@classmethod
	def get_by_id(cls,id):
		item=db_session.query(Project).filter(Project.id==id).first()
		return item 
	@classmethod
	def get_by_name(cls,name):
		item=db_session.query(Project).filter(Project.name==name).first()
		return item

 這是數據庫相關的,

 1 數據庫配置相關的
 2 
 3 from sqlalchemy import  create_engine
 4 from  sqlalchemy.orm import  scoped_session,sessionmaker
 5 from sqlalchemy.ext.declarative import  declarative_base
 6 engine=create_engine('sqlite:///shebei.db',convert_unicode=True)
 7 Base=declarative_base()
 8 db_session=scoped_session(sessionmaker(bind=engine))
 9 def create_all():
10     Base.metadata.create_all(engine)
11 def drop_all():
12     Base.metadata.drop_all(engine)

 

其實在開發的過程當中,也遇到了不少阻力,好比下載一直實現很差,好比分頁,也是參照別人的實現的,

分頁公共模塊

class Pagination:
    def __init__(self, current_page, all_item):
        try:
            page = int(current_page)
        except:
            page = 1
        if page < 1:
            page = 1
        all_pager, c = divmod(all_item, 10)
        if int(c) > 0:
            all_pager += 1
        self.current_page = page
        self.all_pager = all_pager
    @property
    def start(self):
        return (self.current_page - 1) * 10
    @property
    def end(self):
        return self.current_page * 10
    def string_pager(self, base_url="/index/"):
        if self.current_page == 1:
            prev = '<li><a href="javascript:void(0);">上一頁</a></li>'
        else:
            prev = '<li><a href="%s%s">上一頁</a></li>' % (base_url, self.current_page - 1,)
        if self.current_page == self.all_pager:
            nex = '<li><a href="javascript:void(0);">下一頁</a></li>'
        else:
            nex = '<li><a href="%s%s">下一頁</a></li>' % (base_url, self.current_page + 1,)
        last = '<li><a href="%s%s">尾頁</a></li>' % (base_url, self.all_pager,)
        str_page = "".join((prev,nex,last))
        return str_page

 

在上傳文件的時候,原來存放在本地,結果呢,下載處理很差,因而乎選擇了七牛,須要到七牛的官網去註冊本身的帳號

from qiniu import Auth,put_file,etag,urlsafe_base64_encode
import qiniu.config
access_key='uVxowDUcYx641ivtUb111WBEI4112L3D117JHNM_AOtskRh4'
secret_key='PdXU9XrXTLtp1N21bhU1Frm1FDZqE1qhjkEaE9d1xVLZ5C'
def sendfile(key,file):
    q=Auth(access_key,secret_key)
    bucket_name='leilei22'
    token = q.upload_token(bucket_name, key)
    ret, info = put_file(token, key, file)
    me= ret['hash']
    f=etag(file)
    if me==f:
        assert_t=True
    else:
        assert_t=False
    return assert_t

 解析Excel,主要用於上傳測試用例

import xlrd,xlwt
from xlutils.copy import copy
def datacel(filepath):
    file=xlrd.open_workbook(filepath)
    me=file.sheets()[0]
    nrows=me.nrows
    porject_id_list=[]
    casename_list=[]
    case_qianzhi_list=[]
    case_buzhou_list=[]
    case_yuqi_list=[]
    for i in range(1,nrows):
        porject_id_list.append(me.cell(i,0).value)
        casename_list.append(me.cell(i,2).value)
        case_qianzhi_list.append(me.cell(i,3).value)
        case_buzhou_list.append(me.cell(i,4).value)
        case_yuqi_list.append(me.cell(i,1).value)
    return porject_id_list,casename_list,case_qianzhi_list,case_buzhou_list,case_yuqi_list

其實這麼如今公共模塊完畢了,其實如今能夠着手去開始寫咱們的代碼了,主要的代碼,還有靜態界面,由於先後端都是我本身,個人前端其實仍是從網上找來的模板,

學習的道路是痛苦的,可是我相信我是能夠成功的,

from tornado.web import RequestHandler
from models.model_py import User
class BaseHandler(RequestHandler):
	@property
	def db(self):
		return self.application.db
	def get_current_user(self):
		user_id = self.get_secure_cookie('user_id')
		if not user_id:
			return None
		return User.get_by_id(int(user_id))

 基礎的類集成了RequestHandler的類,,進行了一些簡單的自定義,而後後續能夠用這個,

進過兩週的開發,已經造成了成熟的,

相關文章
相關標籤/搜索