做業需求:python
一、額度 15000或自定義 二、實現購物商城,買東西加入購物車,調用信用卡接口結帳 三、能夠提現,手續費5% 四、支持多帳戶登陸 五、支持帳戶間轉帳 六、記錄每個月平常消費流水 七、提供還款接口 八、ATM記錄操做日誌 九、提供管理接口,包括添加帳戶、用戶額度,凍結帳戶等。。。 十、用戶認證用裝飾器
注意:以上需求,要充分使用函數,請盡你的最大限度來減小重複代碼!mysql
程序流程圖(待補全)
![]()git
bin ├── atm.py # atm入口 ├── __init__.py └── manage.py # 管理入口 conf ├── __init__.py └── settings.py # 配置文件 core ├── accounts.py # 帳號添加、修改額度、禁用、啓動接口 ├── actions.py # sql動做接口 ├── auth.py # 用戶認證接口 ├── database.py # 數據庫操做接口 ├── db_handler.py# 無用到 ├── __init__.py ├── logger.py # 日誌接口 ├── main.py # 主接口 ├── parsers.py # sql語法解析接口 └── transaction.py # 交易接口 db ├── accounts_table # 用戶帳號表 └── managers_table # 管理員帳號表 log ├── access.log #訪問日誌 └── transactions.log #交易日誌
github連接github
atm.py正則表達式
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == '__main__': main.run('atm')
manage.pysql
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == '__main__': main.run('manage')
setting.py數據庫
#!_*_coding:utf-8_*_ #__author__:"Alex Li" import os import sys import logging BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Database title summary TITLE = ['id','name','age','phone','dept','enroll_date','expire_date','account','password','credit','balance','status','pay_day'] # Account database setting DATABASE = { 'engine': 'file_storage', # support mysql, postgresql in the future 'name':'accounts_table', 'path': "%s/db/" % BASE_DIR } # Manager account database setting MANAGE_DATABASE = { 'engine': 'file_storage', # support mysql, postgresql in the future 'name':'managers_table', 'path': "%s/db/" % BASE_DIR } # logger setting LOG_LEVEL = logging.INFO LOG_TYPES = { 'transaction': 'transactions.log', 'access': 'access.log', } # Transaction setting TRANSACTION_TYPE = { 'repay':{'action':'plus', 'interest':0}, 'withdraw':{'action':'minus', 'interest':0.05}, 'transfer':{'action':'minus', 'interest':0.05}, 'consume':{'action':'minus', 'interest':0}, } ACCOUNT_DEFAULT = { 'credit': 15000.0 }
accounts.pyjson
#!_*_coding:utf-8_*_ import json import time from core import database from conf import settings from core import parsers from core import actions def load_accounts(account): """ Check account whether exists in a database :param account: :return: """ base_dir = settings.DATABASE['path'] sql_str = 'select * from accounts_table where account = %s' % account sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) res = actions.actions(sql_type, dict_sql) if not res: return False else: return True def change_account(account, set_str): """ Change account data :param account: :param set_str: :return: """ base_dir = settings.DATABASE['path'] sql_str = 'update accounts_table set %s where account = %s' % (set_str, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql) def add_account(*args): """ Add an new account :param args: :param kwargs: :return: """ base_dir = settings.DATABASE['path'] sql_str = 'add to accounts_table values %s' % (','.join(args)) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql)
actions.pyapp
# -*- coding: utf-8 -*- from core import database from conf import settings import re def actions(sql_type,dict_sql): """ sql操做主函數 :param sql_type: sql語句的類型 :return: actions_dict[sql_type] 相應操做的函數 """ actions_dict = {'select': select_action, 'add': add_action, 'del': del_action, 'update': update_action} if sql_type in actions_dict: # 判斷導入的sql類型是否在actions_dict字典中定義。 return actions_dict[sql_type](dict_sql) else: return False def select_action(dict_sql): temp_list = [] info = dict_sql['select'] data = database.read_db(dict_sql['from']) # 獲取原始數據庫文件中的全部數據,data爲列表格式 key = dict_sql['where'][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id count = 0 for values in data: # 讀取data列表中的每個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = '"' + str(values[key]) + '"' dict_sql['where'][0] = value # 將values[key]的值取出並從新賦值爲sql語句的key值。 print(where_action(dict_sql['where'])) if where_action(dict_sql['where']): # 將新的where語句,發送給where_action語句進行bool判斷。 count += 1 temp_list.append(values) return temp_list def add_action(dict_sql): """ 插入動做 獲取用戶輸入的values,並在表中插入 :param dict_sql: parsers函數處理後的字典格式的sql語句 """ data = database.read_db(dict_sql['to']) # 獲取原始數據庫文件中的全部數據 value = dict_sql['values'] # 從dict_sql中獲取values的列表 t_id = str(int(data[-1]['id']) + 1) # 獲取原始數據庫文件中id列最後一行的id數值,並每次自動+1。而後轉換爲字符串格式 value.insert(0, t_id) # 將添加的id插入到value變量中 if len(value) != len(settings.TITLE): # 判斷輸入值得長度是否等於數據庫文件中定義的列的長度 print('列數不正確') else: data.append(dict(zip(settings.TITLE, value))) # 在獲取的原始數據中插入行的數據 database.write_db(dict_sql['to'], data) def del_action(dict_sql): """ 刪除動做函數 :param dict_sql: parsers函數處理後的字典格式的sql語句 """ temp_list = [] data = database.read_db(dict_sql['from']) # 獲取原始數據庫文件中的全部數據,data爲列表格式 key = dict_sql['where'][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id for values in data: # 讀取data列表中的每個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = '"' + str(values[key]) + '"' dict_sql['where'][0] = value # 將values[key]的值取出並從新賦值爲sql語句的key值。 if where_action(dict_sql['where']): # 將新的where語句,發送給where_action語句進行bool判斷。 temp_list.append(values) # 若是符合條件,就從data中移除對應的values return temp_list # print('已刪除%s條記錄' % len(temp_list)) # for i in temp_list: # data.remove(i) # write_db(dict_sql['from'], data) # 將新生成的data從新寫入文件 def update_action(dict_sql): """ 更新動做函數 :param dict_sql: parsers函數處理後的字典格式的sql語句 """ data = database.read_db(dict_sql['update']) # 獲取原始數據庫文件中的全部數據,data爲列表格式 key = dict_sql['where'][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id set_key = dict_sql['set'][0] # 獲取set語句中用戶輸入的key set_value = dict_sql['set'][2].strip("'").strip('"') # 獲取set語句中用戶輸入的value count = 0 for values in data: # 讀取data列表中的每個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = '"' + str(values[key]) + '"' dict_sql['where'][0] = value # 將values[key]的值取出並從新賦值爲sql語句的key值。 if where_action(dict_sql['where']): # 將新的where語句,發送給where_action語句進行bool判斷。 count += 1 values[set_key] = set_value # 若是符合條件,使用將set_key的值修改成set_value print(data) print('已更新%s條記錄' % count) database.write_db(dict_sql['update'], data) # 將新生成的data從新寫入文件 def where_action(condition): """ where語句操做函數 :param condition: 判斷語句。就是字典中where的值 :return: """ if 'like' in condition: # 若是like在語句中 # 將where語句中的第二個參數和,第一個參數進行正則比較。若是執行正常就返回True return re.search(condition[2].strip("'").strip('"'), condition[0]) and True else: return eval(' '.join(condition)) # 除此使用eval進行python的邏輯判斷
auth.pydom
#!_*_coding:utf-8_*_ import os from core import parsers from core import actions from core import db_handler from conf import settings from core import logger import json import time def login_required(func): "驗證用戶是否登陸" def wrapper(*args, **kwargs): print(args, kwargs) # print('--wrapper--->',args,kwargs) if args[0].get('is_authenticated'): return func(*args, **kwargs) else: exit("User is not authenticated.") return wrapper def acc_auth(account, password, type): ''' 優化版認證接口 :param account: credit account number :param password: credit card password :return: if passed the authentication , retun the account object, otherwise ,return None ''' # table = None # base_dir = None if type == 'atm': base_dir = settings.DATABASE['path'] table = settings.DATABASE['name'] elif type == 'manage': base_dir = settings.MANAGE_DATABASE['path'] table = settings.MANAGE_DATABASE['name'] sql_str = 'select * from %s where account = %s' % (table, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) print(dict_sql) res = actions.actions(sql_type, dict_sql) print(res) if not res: print("\033[31;1mAccount ID and password is incorrect!\033[0m") elif res[0]['password'] == password: print('haha') exp_time_stamp = time.mktime(time.strptime(res[0]['expire_date'], "%Y-%m-%d")) if time.time() > exp_time_stamp: print("\033[31;1mAccount [%s] has expired,please contact the back to get a new card!\033[0m" % account) elif res[0]['status'] == 1: print("\033[31;1mAccount [%s] has expired,please contact the back to get a new card!\033[0m" % account) else: # passed the authentication return res[0] else: print("\033[31;1mAccount ID and password is incorrect!\033[0m") def acc_login(user_data, log_obj, type): ''' account login func :user_data: user info data , only saves in memory :return: ''' retry_count = 0 while user_data['is_authenticated'] is not True and retry_count < 3 : account = input("\033[32;1maccount:\033[0m").strip() password = input("\033[32;1mpassword:\033[0m").strip() auth = acc_auth(account, password, type) if auth: # not None means passed the authentication log_obj.info("account [%s] login system" % account) user_data['is_authenticated'] = True user_data['account_id'] = account return auth retry_count +=1 else: log_obj.error("account [%s] too many login attempts" % account) exit() def acc_logout(user_data, log_obj): account = user_data['account_data']['name'] user_data['is_authenticated'] = False log_obj.info("account [%s] logout system" % account) exit("account [%s] logout system" % account)
databaase.py
# -*- coding: utf-8 -*- from conf import settings def read_db(table): """ 讀取表文件函數。 :param table: 表文件參數 :return: 返回一個包含表文件內容的字典 """ title = settings.TITLE try: main_list = [] with open(table, 'r', encoding='utf-8') as rf: for line in rf: temp_list = [] if line.rstrip('\n').split(',') == title: continue else: for values in line.strip('\n').split(','): if values.isdigit(): temp_list.append(int(values)) else: temp_list.append(values) main_list.append(dict(zip(title, temp_list))) return main_list except FileNotFoundError as e: print(e) exit(1) def write_db(table, data): """ 寫入表文件函數。 :param table: 表文件參數 :param data: 導入的數據。爲字典格式 """ value2 = ','.join(settings.TITLE) + '\n' for values in data: temp_list = [] for value in values.values(): temp_list.append(str(value)) value2 += ','.join(temp_list) + '\n' with open(file=table, mode='w', encoding='utf-8') as wf: wf.write(value2) def print_info(info, **kwargs): """ 打印函數。 用於select語句打印顯示 :param info: select語句中須要顯示的類 :param kwargs: 字典,用於進行操做的原始數據 :return: """ temp_list = [] if info == '*': for key in kwargs: temp_list.append(str(kwargs[key])) print(','.join(temp_list)) else: info_list = info.split(',') for i in info_list: temp_list.append(str(kwargs[i])) print(','.join(temp_list))
logger.py
#!_*_coding:utf-8_*_ """ handle all the logging works """ import logging from conf import settings import time import re def logger(log_type): # create logger my_logger = logging.getLogger(log_type) my_logger.setLevel(settings.LOG_LEVEL) # create console handler and set level to debug ch = logging.StreamHandler() ch.setLevel(settings.LOG_LEVEL) # create file handler and set level to warning log_file = "%s/log/%s" % (settings.BASE_DIR, settings.LOG_TYPES[log_type]) fh = logging.FileHandler(log_file) fh.setLevel(settings.LOG_LEVEL) # create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch and fh ch.setFormatter(formatter) fh.setFormatter(formatter) # add ch and fh to logger my_logger.addHandler(ch) my_logger.addHandler(fh) return my_logger def get_log_info(account): """ 將日誌的內容進行轉換後返回相應帳號的轉款信息 :param account: 帳號參數 :return: """ temp_list = [] log_file = "%s/log/%s" % (settings.BASE_DIR, settings.LOG_TYPES['transaction']) with open(log_file, 'r') as f: for i in f: log_mat = re.search('(\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}).*account:(.*?)\s.*action:(.*?)\s.*amount:(.*?)\s.*interest:(.*)',i) datetime = time.strptime(log_mat.group(1),'%Y-%m-%d %H:%M:%S') account_id = log_mat.group(2) action = log_mat.group(3) amount = log_mat.group(4) interest = log_mat.group(5) if account_id == account: temp_list.append([datetime,action,amount,interest]) return temp_list
main.py
#!_*_coding:utf-8_*_ """" main program handle module , handle all the user interaction stuff """ from core import auth from core import logger from core import parsers from core import transaction from core.auth import login_required from core import actions from conf import settings from core import accounts import random import datetime import re import time # transaction logger trans_logger = logger.logger('transaction') # access logger access_logger = logger.logger('access') # temp account data ,only saves the data in memory user_data = { 'account_id': None, 'is_authenticated': False, 'account_data': None } @login_required def account_info(acc_data): """ print account Information :param acc_data: account summary :return: """ back_flag = False info_temp_list = [] account_data = acc_data['account_data'] info_list = ['name', 'age', 'phone', 'enroll_date', 'expire_date', 'account', 'credit', 'balance'] for i in info_list: info_temp_list.append(str(account_data[i])) info = ''' --------- BALANCE INFO -------- username : {0} age: {1} phone: {2} enroll date: {3} expire date: {4} card number: {5} credit: {6} balance: {7} '''.format(*info_temp_list) print(info) while not back_flag: input_b = input("\033[33;1mInput 'b' return to menu:\033[0m").strip() if input_b == 'b': back_flag = True @login_required def repay(acc_data): """ print current balance and let user repay the bill :return: """ account_data = acc_data['account_data'] current_balance = ''' --------- BALANCE INFO -------- Credit : %s Balance: %s'''.format(account_data['credit'], account_data['balance']) print(current_balance) back_flag = False while not back_flag: repay_amount = input("\033[33;1mInput repay amount:\033[0m").strip() if len(repay_amount) > 0 and repay_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, 'repay', repay_amount) if new_balance: print('''\033[42;1mNew Balance:%s\033[0m''' % (new_balance['balance'])) elif repay_amount == 'b': back_flag = True else: print('\033[31;1m[%s] is not a valid amount, only accept integer!\033[0m' % repay_amount) @login_required def withdraw(acc_data): """ print current balance and let user do the withdraw action :param acc_data: :return: """ account_data = acc_data['account_data'] current_balance = ''' --------- BALANCE INFO -------- Credit : %s Balance: %s''' % (account_data['credit'], account_data['balance']) print(current_balance) back_flag = False while not back_flag: withdraw_amount = input("\033[33;1mInput withdraw amount:\033[0m").strip() if len(withdraw_amount) > 0 and withdraw_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, 'withdraw', withdraw_amount) if new_balance: print('''\033[42;1mNew Balance:%s\033[0m''' % (new_balance['balance'])) elif withdraw_amount == 'b': back_flag = True else: print('\033[31;1m[%s] is not a valid amount, only accept integer!\033[0m' % withdraw_amount) @login_required def transfer(acc_data): """ transfer accounts :param acc_data: :return: """ account_data = acc_data['account_data'] current_balance = ''' --------- BALANCE INFO -------- Credit : %s Balance: %s''' % (account_data['credit'], account_data['balance']) print(current_balance) back_flag = False while not back_flag: payee_account = input("\033[33;1mInput payee account:\033[0m").strip() if payee_account == 'b': back_flag = True else: base_dir = settings.DATABASE['path'] sql_str = 'select * from accounts_table where account = %s' % payee_account sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) res = actions.actions(sql_type, dict_sql) if not res: print("\033[31;1mThe payee you entered is not a bank user!\033[0m") else: payee_account_data = res[0] trans_amount = input("\033[33;1mInput transfer amount:\033[0m").strip() if len(trans_amount) > 0 and trans_amount.isdigit(): new_balance = transaction.make_transaction(trans_logger, account_data, 'transfer', trans_amount) payee_balance = transaction.make_transaction(trans_logger, payee_account_data, 'repay', trans_amount) if new_balance: print('''\033[42;1mNew Balance:%s\033[0m''' % (new_balance['balance'])) if payee_balance: print('''\033[42;1mThe money has come to the payee [%s]\033[0m''' % payee_account) elif trans_amount == 'b': back_flag = True else: print('\033[31;1m[%s] is not a valid amount, only accept integer!\033[0m' % trans_amount) @login_required def pay_check(acc_data): """ Account pay check interface :param acc_data: :return: """ back_flag = False account_id = acc_data['account_id'] local_month = time.localtime().tm_mon res = logger.get_log_info(account_id) if res: for result in res: if result[0].tm_mon == local_month: pay_check_info = ''' --------- Datatime %s -------- Action: %s Amount: %s Interest: %s''' % (time.strftime('%Y-%m-%d %H:%M:%S',result[0]), result[1], result[2], result[3]) print(pay_check_info) while not back_flag: input_b = input("\033[33;1mInput 'b' return to menu:\033[0m").strip() if input_b == 'b': back_flag = True @login_required def logout(acc_data): auth.acc_logout(acc_data, access_logger) def interactive(acc_data): """ Atm interactive main interface :param acc_data: account summary :return: """ menu = u''' ------- Oldboy Bank --------- \033[32;1m1. 帳戶信息(功能已實現) 2. 還款(功能已實現) 3. 取款(功能已實現) 4. 轉帳(功能已實現) 5. 帳單(功能已實現) 6. 退出(功能已實現) \033[0m''' menu_dic = { '1': account_info, '2': repay, '3': withdraw, '4': transfer, '5': pay_check, '6': logout, } exit_flag = False while not exit_flag: print(menu) user_option = input(">>:").strip() if user_option in menu_dic: menu_dic[user_option](acc_data) else: print("\033[31;1mOption does not exist!\033[0m") @login_required def add_account(acc_data): exit_flag = False # id, name, age, phone, dept, enroll_date, expire_date, account, password, credit, balance, status, pay_day while not exit_flag: account_name = input("\033[33;1mInput user name:\033[0m").strip() if len(account_name) > 0: account_name = account_name else: continue account_age = input("\033[33;1mInput user age:\033[0m").strip() if len(account_name) > 0 and account_age.isdigit(): account_age = account_age else: continue account_phone = input("\033[33;1mInput user phone number:\033[0m").strip() if len(account_phone) > 0 and account_phone.isdigit() and len(account_phone) == 11: account_phone = account_phone else: continue account_dept = input("\033[33;1mInput user dept:\033[0m").strip() if len(account_dept) > 0: account_dept = account_dept else: continue account = ''.join(str(random.choice(range(10))) for _ in range(5)) # 隨機生成5位帳號 password = input("\033[33;1mInput account password:\033[0m").strip() if len(password) == 0: password = 'abcde' else: password = password account_enroll_date = datetime.datetime.now().strftime("%Y-%m-%d") # 當前時間爲開通時間 account_expire_date = (datetime.datetime.now() + datetime.timedelta(days=(3 * 365))).strftime("%Y-%m-%d") # 3年後爲過時時間 #print(account_enroll_date,account_expire_date) account_credit = input("\033[33;1mInput account credit:\033[0m").strip() if len(account_credit) == 0: account_credit = str(settings.ACCOUNT_DEFAULT['credit']) else: account_credit = account_credit #print(account) input_list = [account_name, account_age,account_phone,account_dept,account_enroll_date,account_expire_date,account,password,account_credit,account_credit,'0','22'] print(input_list) commit = input("\033[33;1mCommit account or exit:(Y/N)\033[0m").strip().upper() if commit == 'Y': accounts.add_account(*input_list) exit_flag = True else: exit_flag = True @login_required def change_credit(acc_data): """ Change account credit amount :param acc_data: :return: """ exit_flag = False while not exit_flag: dis_account = input("\033[33;1mInput account:\033[0m").strip() if dis_account == 'b': exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): new_credits = input("\033[33;1mInput new credits amount:\033[0m").strip() if len(new_credits) > 0 and new_credits.isdigit(): accounts.change_account(dis_account, 'credit = %s' % new_credits) else: print('\033[31;1m[%s] is not a valid amount, only accept integer!\033[0m' % new_credits) else: print("\033[31;1mThe account you entered is not a bank user!\033[0m") @login_required def disable_account(acc_data): """ Disable account :return: """ exit_flag = False while not exit_flag: dis_account = input("\033[33;1mInput account:\033[0m").strip() if dis_account == 'b': exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): accounts.change_account(dis_account, 'status = 1') else: print("\033[31;1mThe account you entered is not a bank user!\033[0m") @login_required def enable_account(acc_data): """ Enable account :return: """ exit_flag = False while not exit_flag: dis_account = input("\033[33;1mInput account:\033[0m").strip() if dis_account == 'b': exit_flag = True elif len(dis_account) > 0: if accounts.load_accounts(dis_account): accounts.change_account(dis_account, 'status = 0') else: print("\033[31;1mThe account you entered is not a bank user!\033[0m") def mg_interactive(acc_data): """ Mange interactive main interface :param acc_data: 返回的用戶帳號的具體信息 :return: """ menu = u''' ------- Oldboy Bank --------- \033[32;1m1. 添加帳號(功能已實現) 2. 修改額度(功能已實現) 3. 禁用帳號(功能已實現) 4. 啓用帳號(功能已實現) 5. 退出(功能已實現) \033[0m''' menu_dic = { '1': add_account, '2': change_credit, '3': disable_account, '4': enable_account, '5': logout, } exit_flag = False while not exit_flag: print(menu) user_option = input(">>:").strip() if user_option in menu_dic: menu_dic[user_option](acc_data) else: print("\033[31;1mOption does not exist!\033[0m") def run(type): """ this function will be called right a way when the program started, here handles the user interaction stuff :return: """ if type == 'atm': acc_data = auth.acc_login(user_data, access_logger, 'atm') if user_data['is_authenticated']: user_data['account_data'] = acc_data interactive(user_data) elif type == 'manage': acc_data = auth.acc_login(user_data, access_logger, 'manage') if user_data['is_authenticated']: user_data['account_data'] = acc_data mg_interactive(user_data)
parsers.py
# -*- coding: utf-8 -*- import re #from .help import help def parsers(sql_str, sql_type, base_dir): """ 語法解析函數 :param sql_type: 從main()函數導入的sql語句類型。 :return: parsers_dict[sql_type] 相應的語法解析函數 """ parsers_dict = {'select': select_parser, 'add': add_parser, 'del': del_parser, 'update': update_parser} if sql_type in parsers_dict: return parsers_dict[sql_type](sql_str, sql_type, base_dir) else: return False def select_parser(sql_str, sql_type, base_dir): """ 搜索語句解析函數 :param sql_str: 用戶輸入的sql語句 :param sql_type: 用戶輸入的sql語句類型 :param base_dir: 主函數導入的數據庫所在路徑 :return: """ dict_sql = {} # 建立空字典 command_parse = re.search(r'select\s(.*?)\sfrom\s(.*?)\swhere\s(.*)', sql_str, re.I) # 使用正則表達式解析add語法,而且re.I忽略大小寫 if command_parse: dict_sql['select'] = command_parse.group(1) dict_sql['from'] = base_dir + command_parse.group(2) # sql字典'from’鍵添加數據庫表文件路徑的值 dict_sql['where'] = command_parse.group(3) # sql字典‘where’鍵添加插入的值 if logic_cal(dict_sql['where']): # 使用logic_cal函數將where語句語法再次進行解析 dict_sql['where'] = logic_cal(dict_sql['where']) # 如解析有返回值,將返回值從新做爲dict_sql['where']的值 return dict_sql else: print(help(sql_type)) # 當語法解析不正常答應幫助 else: print(help(sql_type)) # 當語法解析不正常答應幫助 def add_parser(sql_str, sql_type, base_dir): """ 添加語句解析函數 :param sql_str: 用戶輸入的sql語句 :param sql_type: 用戶輸入的sql語句類型 :param base_dir: 主函數導入的數據庫所在路徑 :return: dict_sql 解析後的字典格式sql語句 """ dict_sql = {} command_parse = re.search(r'add\sto\s(.*?)\svalues\s(.*)', sql_str, re.I) # 使用正則表達式解析add語法,而且re.I忽略大小寫 if command_parse: dict_sql['to'] = base_dir + command_parse.group(1) # sql字典'to’鍵添加數據庫表文件路徑的值 dict_sql['values'] = command_parse.group(2).split(',') # sql字典‘values’鍵添加插入的值 return dict_sql else: print(help(sql_type)) # 當語法解析不正常答應幫助 def del_parser(sql_str, sql_type, base_dir): """ 刪除語句解析函數 :param sql_str: 用戶輸入的sql語句 :param sql_type: 用戶輸入的sql語句類型 :param base_dir: 主函數導入的數據庫所在路徑 :return: dict_sql 解析後的字典格式sql語句 """ dict_sql = {} command_parse = re.search(r'del\sfrom\s(.*?)\swhere\s(.*)', sql_str, re.I) if command_parse: dict_sql['from'] = base_dir + command_parse.group(1) # sql字典'to’鍵添加數據庫表文件路徑的值 dict_sql['where'] = command_parse.group(2) # sql字典‘where’鍵添加插入的值 if logic_cal(dict_sql['where']): # 使用logic_cal函數將where語句語法再次進行解析 dict_sql['where'] = logic_cal(dict_sql['where']) # 如解析有返回值,將返回值從新做爲dict_sql['where']的值 return dict_sql else: print(help(sql_type)) # 當語法解析不正常答應幫助 else: print(help(sql_type)) # 當語法解析不正常答應幫助 def update_parser(sql_str, sql_type, base_dir): """ 更新語句解析函數 :param sql_str: 用戶輸入的sql語句 :param sql_type: 用戶輸入的sql語句類型 :param base_dir: 主函數導入的數據庫所在路徑 :return: dict_sql 解析後的字典格式sql語句 """ dict_sql = {} command_parse = re.search(r'update\s(.*?)\sset\s(.*?)=(.*?)\swhere\s(.*)', sql_str, re.I) if command_parse: dict_sql['update'] = base_dir + command_parse.group(1) # sql字典'to’鍵添加數據庫表文件路徑的值 dict_sql['set'] = [command_parse.group(2), '=', command_parse.group(3)] # sql字典‘where’鍵添加插入的值 dict_sql['where'] = command_parse.group(4) if logic_cal(dict_sql['where']) and logic_cal(dict_sql['set']): # 若是where語句、set語句都符合logic_cal中定義的規範 dict_sql['where'] = logic_cal(dict_sql['where']) # 如解析有返回值,將返回值從新做爲dict_sql['where']的值 dict_sql['set'] = logic_cal(dict_sql['set']) # 如解析有返回值,將返回值從新做爲dict_sql['set']的值 return dict_sql else: print(help(sql_type)) # 當語法解析不正常答應幫助 else: print(help(sql_type)) # 當語法解析不正常答應幫助 def logic_cal(logic_exp): """ 邏輯函數 :param logic_exp: sql語句中和邏輯判斷相關的語句,列表格式。如[‘age','>=',20] 或 [‘dept','like','HR'] :return: logic_exp 通過語法解析後的邏輯判斷語句。列表格式。如[‘age','==',20] 或 [‘dept','like','HR'] """ # 表達式列表優化成三個元素,形如[‘age','>=',20] 或 [‘dept','like','HR'] logic_exp = re.search('(.+?)\s([=<>]{1,2}|like)\s(.+)', ''.join(logic_exp)) if logic_exp: logic_exp = list(logic_exp. group(1, 2, 3)) # 取得re匹配的全部值,並做爲一個列表 if logic_exp[1] == '=': logic_exp[1] = '==' # 判斷邏輯運算的比較符號後的值是否字母,而且用戶是否輸入了雙引號。如沒有輸入手工添加上雙引號。 if not logic_exp[2].isdigit() and not re.search('"(.*?)"', logic_exp[2]): logic_exp[2] = '"' + logic_exp[2] + '"' return logic_exp else: return False
transaction.py
#!_*_coding:utf-8_*_ from conf import settings from core import accounts from core import logger from core import parsers from core import actions #transaction logger def make_transaction(log_obj, account_data, tran_type, amount, **others): ''' deal all the user transactions :param account_data: user account data :param tran_type: transaction type :param amount: transaction amount :param others: mainly for logging usage :return: ''' amount = float(amount) if tran_type in settings.TRANSACTION_TYPE: interest = amount * settings.TRANSACTION_TYPE[tran_type]['interest'] old_balance = float(account_data['balance']) if settings.TRANSACTION_TYPE[tran_type]['action'] == 'plus': new_balance = old_balance + amount + interest elif settings.TRANSACTION_TYPE[tran_type]['action'] == 'minus': new_balance = old_balance - amount - interest #check credit if new_balance <0: print('''\033[31;1mYour credit [%s] is not enough for this transaction [-%s], your current balance is \ [%s]''' %(account_data['credit'],(amount + interest), old_balance )) return account_data['balance'] = new_balance base_dir = settings.DATABASE['path'] sql_str = 'update accounts_table set balance = %s where account = %s' % (new_balance, account_data['account']) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql) # accounts.dump_account(account_data) # save the new balance back to file log_obj.info("account:%s action:%s amount:%s interest:%s" % (account_data['account'], tran_type, amount,interest) ) return account_data else: print("\033[31;1mTransaction type [%s] is not exist!\033[0m" % tran_type)
啓動命令。
python atm.py python manage.py
- 做者:henryyuan - 日期:2018/03/05 - 版本:Version 1.0 - 工具:PyCharm 2017.3.3 - 版本:Python 3.6.4 - MarkDown工具:pycharm - 流程圖工具:ProcessOn
無
2018-3-5 Version:1.0
1.) 在main.py接口中的add_account()函數。用於添加帳號。須要輸入多個input條件。代碼以下:
account_name = input("\033[33;1mInput user name:\033[0m").strip() # 有點重複代碼的感受 if len(account_name) > 0: account_name = account_name else: continue account_age = input("\033[33;1mInput user age:\033[0m").strip() # 有點重複代碼的感受 if len(account_name) > 0 and account_age.isdigit(): account_age = account_age else: continue account_phone = input("\033[33;1mInput user phone number:\033[0m").strip() # 有點重複代碼的感受 if len(account_phone) > 0 and account_phone.isdigit() and len(account_phone) == 11: account_phone = account_phone else: continue
每一個input語句都須要有if的判斷,但當有多個input輸入語句時,就會出現過多的重複的if代碼。如何減小if語句的代碼量。