python初學:第二步——員工信息增刪改查程序

程序要求

  1. 文件存儲時能夠這樣表示python

    id,name,age,phone,dept,enroll_date
    1,Alex Li,22,13651054608,IT,2013-04-01
    2,Jack Wang,28,13451024608,HR,2015-01-07
    3,Rain Wang,21,13451054608,IT,2017-04-01
    4,Mack Qiao,44,15653354208,Sales,2016-02-01
    5,Rachel Chen,23,13351024606,IT,2013-03-16
    6,Eric Liu,19,18531054602,Marketing,2012-12-01
    7,Chao Zhang,21,13235324334,Administration,2011-08-08
    8,Kevin Chen,22,13151054603,Sales,2013-04-01
    9,Shit Wen,20,13351024602,IT,2017-07-03
    10,Shanshan Du,26,13698424612,Operation,2017-07-02
  2. 可進行模糊查詢,語法至少支持下面3種查詢語法:mysql

    select name,age from staff_table where age > 22
    select * from staff_table where dept = "IT"
    select * from staff_table where enroll_date like "2013"
  3. 可建立新員工紀錄,以phone作惟一鍵(即不容許表裏有手機號重複的狀況),staff_id需自增語法:git

    add to staff_table values Alex Li,25,134435344,IT,2015-10-29
  4. 可刪除指定員工信息紀錄,輸入員工id,便可刪除語法:github

    del from staff_table where id = 3
  5. 可修改員工信息,語法以下:正則表達式

    update staff_table set dept = Market where dept = IT #把全部dept=IT的紀錄的dept改爲Market
    update staff_table set age = 25 where name = Alex Li #把name=Alex Li的紀錄的年齡改爲25
  6. 以上每條語名執行完畢後,要顯示這條語句影響了多少條紀錄。好比查詢語句就顯示查詢出了多少條、修改語句就顯示修改了多少條等。

注意:以上需求,要充分使用函數,請盡你的最大限度來減小重複代碼!sql

編寫思路

圖片描述

程序目錄結構

homework_project
├── action
│   ├── database.py # 對數據庫中的表文件進行操做
│   ├── __init__.py
├── config
│   ├── __init__.py
│   └── syntax.py # 配置文件。
├── core
│   ├── actions.py # 對不一樣的sql類型進行對應的操做
│   ├── help.py # 提供幫助
│   ├── __init__.py
│   ├── main.py # 主函數,提供用戶輸入界面。並執行語法解析與sql操做
│   ├── parsers.py # 語法解析函數。對用戶輸入的語法正確性鏡像解析,並最終解析成字典格式
├── database
│   └── staff_table # 表
├── __init__.py
__init__.py
mysql_run.py # 執行程序數據庫

github連接app

程序運行命令

python mysql_run.py

程序正文

mysql_run.py:執行腳本函數

from homework_project.core.main import main


if __name__ == '__main__':
    main()

main.py:主入口程序學習

# -*- coding: utf-8 -*-
from . import parsers as p
from .actions import actions
import os


def main():
    """ 主函數
        獲取用戶輸入,並對用戶進行解析。若是獲取解析值,並執行相應的sql操做。
    """
    database_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/database/'  # 獲取數據庫文件的路徑
    while True:
        sql_str = input('請輸入sql語句>').strip()
        if sql_str:
            sql_type = sql_str.split()[0].lower()  # 獲取輸入的sql語句的類型。
            if p.parses(sql_type):  # 檢查sql的類型是否符合規則
                dict_sql = p.parses(sql_type)(sql_str, sql_type, database_dir)  # 調用parsers模塊的parses函數進行語法解析
                if dict_sql:  # 若是字典格式的sql語句返回
                    actions(sql_type)(dict_sql)  # 則執行後面的sql操做
            else:
                print('sql語法錯誤,程序支持select,del,add,update語句。')
        else:
            continue

parsers.py:sql語法解析模塊

# -*- coding: utf-8 -*-
import re
from .help import help


def parses(sql_type):
    """ 語法解析函數

    :param sql_type: 從main()函數導入的sql語句類型。
    :return:
        parsers_dict[sql_type]
        相應的語法解析函數
    """
    parsers_dict = {'select': select_parser,
                    'add': add_parser,
                    'del': del_parser,
                    'update': update_parser}
    if sql_type in parsers_dict:
        return parsers_dict[sql_type]
    else:
        return False


def select_parser(sql_str, sql_type, base_dir):
    """ 搜索語句解析函數

    :param sql_str: 用戶輸入的sql語句
    :param sql_type: 用戶輸入的sql語句類型
    :param base_dir: 主函數導入的數據庫所在路徑
    :return:
    """
    dict_sql = {}  # 建立空字典
    command_parse = re.search(r'select\s(.*?)\sfrom\s(.*?)\swhere\s(.*)', sql_str, re.I)  # 使用正則表達式解析add語法,而且re.I忽略大小寫
    if command_parse:
        dict_sql['select'] = command_parse.group(1)
        dict_sql['from'] = base_dir + command_parse.group(2)  # sql字典'from’鍵添加數據庫表文件路徑的值
        dict_sql['where'] = command_parse.group(3).split(',')  # sql字典‘where’鍵添加插入的值
        if logic_cal(dict_sql['where']):  # 使用logic_cal函數將where語句語法再次進行解析
            dict_sql['where'] = logic_cal(dict_sql['where'])  # 如解析有返回值,將返回值從新做爲dict_sql['where']的值
            return dict_sql
        else:
            print(help(sql_type))  # 當語法解析不正常答應幫助
    else:
        print(help(sql_type))  # 當語法解析不正常答應幫助


def add_parser(sql_str, sql_type, base_dir):
    """ 添加語句解析函數

    :param sql_str: 用戶輸入的sql語句
    :param sql_type: 用戶輸入的sql語句類型
    :param base_dir: 主函數導入的數據庫所在路徑
    :return:
        dict_sql
        解析後的字典格式sql語句
    """
    dict_sql = {}
    command_parse = re.search(r'add\sto\s(.*?)\svalues\s(.*)', sql_str, re.I)  # 使用正則表達式解析add語法,而且re.I忽略大小寫
    if command_parse:
        dict_sql['to'] = base_dir + command_parse.group(1)  # sql字典'to’鍵添加數據庫表文件路徑的值
        dict_sql['values'] = command_parse.group(2).split(',')  # sql字典‘values’鍵添加插入的值
        return dict_sql
    else:
        print(help(sql_type))  # 當語法解析不正常答應幫助


def del_parser(sql_str, sql_type, base_dir):
    """ 刪除語句解析函數

    :param sql_str: 用戶輸入的sql語句
    :param sql_type: 用戶輸入的sql語句類型
    :param base_dir: 主函數導入的數據庫所在路徑
    :return:
        dict_sql
        解析後的字典格式sql語句
    """
    dict_sql = {}
    command_parse = re.search(r'del\sfrom\s(.*?)\swhere\s(.*)', sql_str, re.I)
    if command_parse:
        dict_sql['from'] = base_dir + command_parse.group(1)  # sql字典'to’鍵添加數據庫表文件路徑的值
        dict_sql['where'] = command_parse.group(2).split(',')  # sql字典‘where’鍵添加插入的值
        if logic_cal(dict_sql['where']):  # 使用logic_cal函數將where語句語法再次進行解析
            dict_sql['where'] = logic_cal(dict_sql['where'])  # 如解析有返回值,將返回值從新做爲dict_sql['where']的值
            return dict_sql
        else:
            print(help(sql_type))  # 當語法解析不正常答應幫助
    else:
        print(help(sql_type))  # 當語法解析不正常答應幫助


def update_parser(sql_str, sql_type, base_dir):
    """ 更新語句解析函數

    :param sql_str: 用戶輸入的sql語句
    :param sql_type: 用戶輸入的sql語句類型
    :param base_dir: 主函數導入的數據庫所在路徑
    :return:
        dict_sql
        解析後的字典格式sql語句
    """
    dict_sql = {}
    command_parse = re.search(r'update\s(.*?)\sset\s(.*?)=(.*?)\swhere\s(.*)', sql_str, re.I)
    if command_parse:
        dict_sql['update'] = base_dir + command_parse.group(1)  # sql字典'to’鍵添加數據庫表文件路徑的值
        dict_sql['set'] = [command_parse.group(2), '=', command_parse.group(3)]  # sql字典‘where’鍵添加插入的值
        dict_sql['where'] = command_parse.group(4).split(',')
        if logic_cal(dict_sql['where']) and logic_cal(dict_sql['set']):  # 若是where語句、set語句都符合logic_cal中定義的規範
            dict_sql['where'] = logic_cal(dict_sql['where'])  # 如解析有返回值,將返回值從新做爲dict_sql['where']的值
            dict_sql['set'] = logic_cal(dict_sql['set'])  # 如解析有返回值,將返回值從新做爲dict_sql['set']的值
            return dict_sql
        else:
            print(help(sql_type))  # 當語法解析不正常答應幫助
    else:
        print(help(sql_type))  # 當語法解析不正常答應幫助


def logic_cal(logic_exp):
    """ 邏輯函數

    :param logic_exp: sql語句中和邏輯判斷相關的語句,列表格式。如[‘age','>=',20] 或 [‘dept','like','HR']
    :return:
        logic_exp
        通過語法解析後的邏輯判斷語句。列表格式。如[‘age','==',20] 或 [‘dept','like','HR']
    """
    # 表達式列表優化成三個元素,形如[‘age','>=',20] 或 [‘dept','like','HR']
    logic_exp = re.search('(.+?)\s([=<>]{1,2}|like)\s(.+)', ''.join(logic_exp))
    if logic_exp:
        logic_exp = list(logic_exp. group(1, 2, 3))  # 取得re匹配的全部值,並做爲一個列表
        if logic_exp[1] == '=':
            logic_exp[1] = '=='
        # 判斷邏輯運算的比較符號後的值是否字母,而且用戶是否輸入了雙引號。如沒有輸入手工添加上雙引號。
        if not logic_exp[2].isdigit() and not re.search('"(.*?)"', logic_exp[2]):
            logic_exp[2] = '"' + logic_exp[2] + '"'
        return logic_exp
    else:
        return False

actions.py:sql操做模塊

# -*- coding: utf-8 -*-
from homework_project.action.database import read_db, write_db, print_info
from homework_project.config.syntax import get_title
import re


def actions(sql_type):
    """ sql操做主函數

    :param sql_type: sql語句的類型
    :return:
        actions_dict[sql_type]
        相應操做的函數
    """
    actions_dict = {'select': select_action,
                    'add': add_action,
                    'del': del_action,
                    'update': update_action}
    if sql_type in actions_dict:  # 判斷導入的sql類型是否在actions_dict字典中定義。
        return actions_dict[sql_type]


def select_action(dict_sql):
    info = dict_sql['select']
    data = read_db(dict_sql['from'])  # 獲取原始數據庫文件中的全部數據,data爲列表格式
    key = dict_sql['where'][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    count = 0
    for values in data:  # 讀取data列表中的每個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = '"' + str(values[key]) + '"'
        dict_sql['where'][0] = value  # 將values[key]的值取出並從新賦值爲sql語句的key值。
        if where_action(dict_sql['where']):  # 將新的where語句,發送給where_action語句進行bool判斷。
            count += 1
            print_info(info, **values)
    print('已查找%s條記錄' % count)


def add_action(dict_sql):
    """ 插入動做
        獲取用戶輸入的values,並在表中插入

    :param dict_sql: parsers函數處理後的字典格式的sql語句
    """
    data = read_db(dict_sql['to'])  # 獲取原始數據庫文件中的全部數據
    value = dict_sql['values']  # 從dict_sql中獲取values的列表
    t_id = str(int(data[-1]['id']) + 1)  # 獲取原始數據庫文件中id列最後一行的id數值,並每次自動+1。而後轉換爲字符串格式
    value.insert(0, t_id)  # 將添加的id插入到value變量中
    if len(value) != len(get_title()):  # 判斷輸入值得長度是否等於數據庫文件中定義的列的長度
        print('列數不正確')
    else:
        data.append(dict(zip(get_title(), value)))  # 在獲取的原始數據中插入行的數據
        print('已添加記錄')
        write_db(dict_sql['to'], data)  # 寫入文件


def del_action(dict_sql):
    """ 刪除動做函數

    :param dict_sql: parsers函數處理後的字典格式的sql語句
    """
    temp_list = []
    data = read_db(dict_sql['from'])  # 獲取原始數據庫文件中的全部數據,data爲列表格式
    key = dict_sql['where'][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    for values in data:  # 讀取data列表中的每個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = '"' + str(values[key]) + '"'
        dict_sql['where'][0] = value  # 將values[key]的值取出並從新賦值爲sql語句的key值。
        if where_action(dict_sql['where']):  # 將新的where語句,發送給where_action語句進行bool判斷。
            temp_list.append(values)  # 若是符合條件,就從data中移除對應的values
    print('已刪除%s條記錄' % len(temp_list))
    for i in temp_list:
        data.remove(i)
    write_db(dict_sql['from'], data)  # 將新生成的data從新寫入文件


def update_action(dict_sql):
    """ 更新動做函數

    :param dict_sql: parsers函數處理後的字典格式的sql語句
    """
    data = read_db(dict_sql['update'])  # 獲取原始數據庫文件中的全部數據,data爲列表格式
    key = dict_sql['where'][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    set_key = dict_sql['set'][0]  # 獲取set語句中用戶輸入的key
    set_value = dict_sql['set'][3].strip("'").strip('"')  # 獲取set語句中用戶輸入的value
    count = 0
    for values in data:  # 讀取data列表中的每個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = '"' + str(values[key]) + '"'
        dict_sql['where'][0] = value  # 將values[key]的值取出並從新賦值爲sql語句的key值。
        if where_action(dict_sql['where']):  # 將新的where語句,發送給where_action語句進行bool判斷。
            count += 1
            values[set_key] = set_value  # 若是符合條件,使用將set_key的值修改成set_value
    print('已更新%s條記錄' % count)
    write_db(dict_sql['update'], data)  # 將新生成的data從新寫入文件


def where_action(condition):
    """ where語句操做函數

    :param condition: 判斷語句。就是字典中where的值
    :return:
    """
    if 'like' in condition:  # 若是like在語句中
        # 將where語句中的第二個參數和,第一個參數進行正則比較。若是執行正常就返回True
        return re.search(condition[2].strip("'").strip('"'), condition[0]) and True

    else:
        return eval(' '.join(condition))  # 除此使用eval進行python的邏輯判斷

help.py:幫助模塊

# -*- coding: utf-8 -*-
def help(sql_type):
    dict = {'select': select_help,
            'add': add_help,
            'del': del_help,
            'update': update_help,
            }
    if sql_type in dict:
        return dict[sql_type]()


def select_help():
    strings = '''select語法錯誤。請查看案例:
    select name,age from staff_table where age > 22
    select * from staff_table where dept = "IT"
    select * from staff_table where enroll_date like "2013"
    '''
    return strings


def add_help():
    strings = '''add語法錯誤。請查看案例:
    add to staff_table values Alex Li,25,134435344,IT,2015-10-29
    '''
    return strings


def del_help():
    strings = '''del語法錯誤。請查看案例:
    del from staff_table where id = 3
    '''
    return strings


def update_help():
    strings = '''update語法錯誤。請查看案例:
    UPDATE staff_table SET dept="Market" WHERE  dept = "IT"
    UPDATE staff_table SET age=25 WHERE  name = "Alex Li"
    '''
    return strings

database.py:文件讀寫模塊

# -*- coding: utf-8 -*-
from homework_project.config.syntax import get_title

def read_db(table):
    """ 讀取表文件函數。

    :param table: 表文件參數
    :return: 返回一個包含表文件內容的字典
    """
    title = get_title()
    try:
        main_list = []
        with open(table, 'r', encoding='utf-8') as rf:
            for line in rf:
                temp_list = []
                if line.rstrip('\n').split(',') == title:
                    continue
                else:
                    for values in line.strip('\n').split(','):
                        if values.isdigit():
                            temp_list.append(int(values))
                        else:
                            temp_list.append(values)
                    main_list.append(dict(zip(title, temp_list)))
        return main_list
    except FileNotFoundError as e:
        print(e)
        exit(1)


def write_db(table, data):
    """ 寫入表文件函數。

    :param table: 表文件參數
    :param data: 導入的數據。爲字典格式
    """
    value2 = ','.join(get_title()) + '\n'
    for values in data:
        temp_list = []
        for value in values.values():
            temp_list.append(str(value))
        value2 += ','.join(temp_list) + '\n'
    with open(file=table, mode='w', encoding='utf-8') as wf:
        wf.write(value2)


def print_info(info, **kwargs):
    """ 打印函數。
        用於select語句打印顯示

    :param info: select語句中須要顯示的類
    :param kwargs: 字典,用於進行操做的原始數據
    :return:
    """
    temp_list = []
    if info == '*':
        for key in kwargs:
            temp_list.append(str(kwargs[key]))
        print(','.join(temp_list))
    else:
        info_list = info.split(',')
        for i in info_list:
            temp_list.append(str(kwargs[i]))
        print(','.join(temp_list))

sytanx.py:配置文件模塊

def get_title():
    title_dict = ['id','name','age','phone','dept','enroll_date']
    return title_dict

README

README.md

學習筆記

文件處理筆記
函數基礎筆記
函數高階筆記
模塊筆記(待修改)

相關文章
相關標籤/搜索