# _*_ coding:utf-8 _*_"""模擬數據庫:表內容:1,Alex,22,13651054608,IT2,Egon,23,13304320533,Tearcher3,ding,23,18311028091,IT表結構:KEYS = ["id", "name", "age", "phone", "job"]只有一個信息表,表名:table增,語法:add from table ding,23,18311028091,IT刪,語法:del from table where id=3改,語法:update table set age = 23 where name=ding查,語法:select from table where age=22 select * from table where age=22 select * from table"""import osdb_file = "table"KEYS = ["id", "name", "age", "phone", "job"]def print_log(msg, log_type="info"): if log_type == "info": print(msg) elif log_type == "error": print("\033[31;1m%s\033[0m" % msg)def load_db(db_file): """ 加載員工信息表,加載成字典格式 :param db_file: :return: """ info = {} for i in KEYS: info[i] = [] with open(db_file, encoding="utf-8") as f: for line in f: cust_id, name, age, phone, job = line.split(",") info["id"].append(cust_id) info["name"].append(name) info["age"].append(age) info["phone"].append(phone) info["job"].append(job) return infoDATA = load_db(db_file)def method_gt(column, condtion_val): """ ">"方法 :param column:listing :param val:值 :return: """ user_info = [] for index,val in enumerate(DATA[column]): if float(val) > float(condtion_val): record = [] for col in KEYS: record.append(DATA[col][index]) user_info.append(record) return user_infodef method_lt(column, condtion_val): """ "<"方法 :param column: :param condtion_val: :return: """ user_info = [] for index,val in enumerate(DATA[column]): if float(val) < float(condtion_val): record = [] for col in KEYS: record.append(DATA[col][index]) user_info.append(record) return user_infodef method_eq(column, condtion_val): """ "="方法 :param column: :param condtion_val: :return: """ user_info = [] for index,val in enumerate(DATA[column]): if val == condtion_val: record = [] for col in KEYS: record.append(DATA[col][index]) user_info.append(record) return user_infodef method_like(column, condtion_val): """ "like"方法 :param column: :param condtion_val: :return: """ user_info = [] for index,val in enumerate(DATA[column]): if condtion_val in val: record = [] for col in KEYS: record.append(DATA[col][index]) user_info.append(record) return user_infodef way_where(clause): """ 解析where條件,並過濾數據 :param clause: :return: """ method = { ">": method_gt, "<": method_lt, "=": method_eq, "like": method_like } for method_key, method_func in method.items(): if method_key in clause: column, val = clause.split(method_key) user_data = method_func(column.strip(), val.strip()) return user_data else: print_log("語法錯誤:where條件只支持[>,<,=,like]", "error")def sql_parser(choise): """ 解析sql語句 :param choise: :return: """ way_list = { "select":way_select, "add":way_add, "update":way_update } if choise.split()[0] in ("add", "del", "update", "select"): if "where" in choise: query_clause, where_clause = choise.split("where") way_where(where_clause) user_data = way_where(where_clause) else: user_data = [] for index,cust_id in enumerate(DATA["id"]): data = [] for col in KEYS: data.append(DATA[col][index]) user_data.append(data) query_clause = choise choise_action = choise.split()[0] if choise_action in way_list: way_list[choise_action](user_data,query_clause) elif choise.split()[0] == "del": way_delete(user_data,choise) else: print_log( "語法錯誤:[add\\del\\update\\select] from [table] [where] [listing] [>,<,=,like] [val]\n", "error")def way_add(date_set,query_clause): """ 對員工信息表添加數據 語法:add from table ding,23,18311028091,IT :param date_set: :param query_clause: :return: """ choise_way = query_clause.split("from")[0].strip() choise_val = query_clause.split(db_file)[1].strip() if choise_way == "add" and len(choise_val) > 1: with open(db_file,encoding="utf-8",mode="a") as f1: choise_id = id + 1 choise_id_1 = str(choise_id) f1.write(choise_id_1) f1.write(",") f1.write(choise_val) f1.close() print("添加%s成功!"% choise_val)def way_select(data_set,query_clause): """ 解析查詢語句並從data_set打印指定的列 語法:select from table where age=22 select * from table where age=22 select * from table :param date_set: :param query_clause: :return: """ columns = query_clause.split("from")[0][6:].split(",") cols = [i.strip() for i in columns] if '*' in cols[0]: print(data_set) else: format_data_set = [] for listing in data_set: col_vals = [] #把要打印的字段放在這個列表裏 for col in cols: col_index = KEYS.index(col) #拿到每條記錄的索引和對應的值 col_vals.append( listing[col_index] ) format_data_set.append(col_vals) for i in format_data_set: print(i)def read(): with open(db_file, encoding="utf-8", mode="r") as f: lines = f.readlines() last_line = lines[-1] last_id = last_line[0] last_id_1 = int(last_id) f.close() return last_id_1id = read()def way_delete(data_set,choise): """ 根據"id"號刪除整行數據 del from table where id=3 :param date_set: :param query_clause: :return: """ columns = choise.split("where")[1].split("=")[1] columns_id = int(columns) if columns in DATA["id"]: with open(db_file,encoding="utf-8",mode="r") as f: user_info = [] for i in f: r = i.strip().split("\n") user_info.append(r) user_info.pop(columns_id-1) f.close() with open("%s.new" % db_file, encoding="utf-8", mode="w") as f1: for w in user_info: str = ",".join(w) f1.write(str+"\n") f1.close() os.remove(db_file) os.rename("%s.new" % db_file, db_file) print("刪除數據成功!") else: print_log("輸入有誤!","error") #with open(db_file,encoding="utf-8",mode="w") as f:def way_update(data_set,query_clause): """ 語法:update table set age = 23 where name=ding :param date_set: :param query_clause: :return: """ update_info = query_clause.split("set") if len(update_info) > 1: col_name,new_val = update_info[1].strip().split("=") for matched_row in data_set: cust_id = matched_row[0] cust_id_index = DATA["id"].index(cust_id) DATA[col_name][cust_id_index] = new_val save_db() print("update" " " +col_name+ "=" +new_val+ " ""數據成功!") else: print_log("語法錯誤:未檢測到set關鍵字!","error")def save_db(): """ 把修改 在內存中的數據存到硬盤 :return: """ with open("%s.new"%db_file,encoding="utf-8",mode="w") as f1: for index,cust_id in enumerate(DATA["id"]): row = [] for col in KEYS: row.append( DATA[col][index]) f1.write(",".join(row)) f1.close() os.remove(db_file) os.rename("%s.new"% db_file,db_file)def main(): """ 次函數是用戶輸入sql語句,調用sql語句方法。 :return: """ while True: choise = input("SQL:").strip() if not choise: print("輸入不能爲空,請從新輸入。") continue sql_parser(choise.strip())main()