Python3自動生成MySQL數據字典的markdown文本

爲啥要寫這個腳本

五一前的準備下班的時候,看到同事爲了作數據庫的某個表的數據字典,在作一個複雜的人工操做,就是一個字段一個字段的純手擼,那速度可想而知是多麼的折磨和鍛鍊人的意志和耐心,反正就是很耗時又費力的活,關鍵是工做效率過低了,因而就網上查了一下,可否有在線工具可用,可是並無找到理想和如意的,因而吧,就乾脆本身擼一個,一勞永逸,說幹就幹的那種……python

先屢一下腳本思路

第一步:輸入或修改數據庫鏈接配置信息,以及輸入數據表名mysql

第二步:利用pymysql模塊鏈接數據庫,並判斷數據表是否存在git

第三步:獲取數據表的註釋github

第四步:存儲文件夾和文件處理,刪除已存在的文件避免重複寫入sql

第五步:先寫入Markdown的表頭部信息數據庫

第六步:從information_schema中查詢表結構和相關信息segmentfault

第七步:依次拼裝每一個字段的Markdown文本寫入,結束並關閉相關鏈接markdown

運行環境

Python運行環境:Windows + python3.6
用到的模塊:pymysql、os、time、pyinstaller
如未安裝的模塊,請使用pip instatll xxxxxx進行安裝,例如:pip install pyinstaller工具

獲取數據庫鏈接信息的兩種方式

既然是要作數據字典,那麼確定就須要先鏈接數據庫,而鏈接數據庫,天然就須要先知道數據庫的基本信息:IP地址、用戶名、登陸密碼、數據庫名等……測試

爲了方便,我這裏寫了兩種配置MySQL鏈接的方法:第一種是直接配置在代碼裏,直接修改代碼裏的鏈接信息就能夠了;另一種就是經過手動輸入連接信息,不用修改代碼,方便快速多用。具體的完整源碼,我都上傳到同性交友網站GitHub了,能夠點下面的連接查看……

生成可執行文件

爲了方便不一樣的人羣方便快速的使用,能夠不用安裝Python環境來執行py腳本文件,我把相關腳本打包成Windows可直接執行的exe文件,下載雙擊運行便可(可能有的系統須要管理員權限運行),打包的方式很簡單,就是利用pyinstaller模塊進行快速打包,省時省力,具體更多用法你們能夠網上查一下。

打包命令爲: pyinstaller -F -i favicon.ico data_dict_input.py

執行這個命令後,就會在當前目錄下生成一個dict和其餘的文件夾和相關文件,其中,打開dict,下面會生成一個文件名相同的exe文件data_dict_input.exe,雙擊這個文件就能夠打開了,拷貝到其餘地方同樣可使用。

下面我把兩種方式的腳本,都生成了exe可執行文件,你們能夠直接點擊下載試用,若是下載不了,請直接去GitHub倉庫下載或者本身生成

完整代碼

爲了方便部分人想偷懶,不直接去交友網站查看,我在這裏也貼一下其中的一個源碼出來吧(其實吧,我是以爲文章篇幅有點短,來湊字數的,你們明白就好,看透不說透)。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
自動生成MySQL數據表的數據字典支持多個
自動獲取數據庫鏈接信息,方便多用
author: gxcuizy
date: 2020-04-30
"""

import pymysql
import os
import time


class DataDict(object):
    def __init__(self, connect_info):
        # 數據庫鏈接配置
        self.host_name = connect_info[0]
        self.user_name = connect_info[1]
        self.pwd = connect_info[2]
        self.db_name = connect_info[3]
        self.folder_name = 'mysql_dict'

    def run(self, table_str):
        """腳本執行入口"""
        try:
            # 建立一個鏈接
            conn = pymysql.connect(self.host_name, self.user_name, self.pwd, self.db_name)
            # 用cursor()建立一個遊標對象
            cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        except Exception:
            print('數據庫鏈接失敗,請檢查鏈接信息!')
            exit(1)
        table_list = table_str.split(',')
        for table_name in table_list:
            # 判斷表是否存在
            sql = "SHOW TABLES LIKE '%s'" % (table_name,)
            cursor.execute(sql)
            result_count = cursor.rowcount
            if result_count == 0:
                print('%s數據庫中%s表名不存在,沒法生成……' % (self.db_name, table_name))
                continue
            # 表註釋獲取
            print('開始生成表%s的數據字典' % (table_name,))
            sql = "show table status WHERE Name = '%s'" % (table_name,)
            cursor.execute(sql)
            result = cursor.fetchone()
            table_comment = result['Comment']
            # 文件夾和文件處理
            file_path = self.folder_name + os.sep + table_name + '.md'
            self.deal_file(file_path)
            # 打開文件,準備寫入
            dict_file = open(file_path, 'a', encoding='UTF-8')
            dict_file.write('#### %s %s' % (table_name, table_comment))
            dict_file.write('\n | 字段名稱 | 字段類型 | 默認值 | 字段註釋 |')
            dict_file.write('\n | --- | --- | --- | --- |')
            # 表結構查詢
            field_str = "COLUMN_NAME,COLUMN_TYPE,COLUMN_DEFAULT,COLUMN_COMMENT"
            sql = "select %s from information_schema.COLUMNS where table_schema='%s' and table_name='%s'" % (field_str, self.db_name, table_name)
            cursor.execute(sql)
            fields = cursor.fetchall()
            for field in fields:
                column_name = field['COLUMN_NAME']
                column_type = field['COLUMN_TYPE']
                column_default = str(field['COLUMN_DEFAULT'])
                column_comment = field['COLUMN_COMMENT']
                info = ' | ' + column_name + ' | ' + column_type + ' | ' + column_default + ' | ' + column_comment + ' | '
                dict_file.write('\n ' + info)
            # 關閉鏈接
            print('完成表%s的數據字典' % (table_name,))
            dict_file.close()
        cursor.close()
        conn.close()

    def deal_file(self, file_name):
        """處理存儲文件夾和文件"""
        # 不存在則建立文件夾
        if not os.path.exists(self.folder_name):
            os.mkdir(self.folder_name)
        # 刪除已存在的文件
        if os.path.isfile(file_name):
            os.unlink(file_name)

    def test_conn(self, conn_info):
        """測試數據庫鏈接"""
        try:
            # 建立一個鏈接
            pymysql.connect(conn_info[0], conn_info[1], conn_info[2], conn_info[3])
            return True
        except Exception:
            return False


# 程序執行入口
if __name__ == '__main__':
    # 數據數據鏈接信息
    conn_info = input('請輸入mysql數據庫鏈接信息(格式爲:主機IP,用戶名,登陸密碼,數據庫名),逗號分隔且輸入順序不能亂,例如:192.168.0.1,root,root,test_db:')
    conn_list = conn_info.split(',')
    while conn_info == '' or len(conn_list) != 4:
        conn_info = input('請正確輸入mysql數據庫鏈接信息(格式爲:主機IP,用戶名,登陸密碼,數據庫名),逗號分隔且輸入順序不能亂,例如:192.168.0.1,root,root,test_db:')
        conn_list = conn_info.split(',')
    # 測試數據庫鏈接問題
    dd_test = DataDict(conn_list)
    db_conn = dd_test.test_conn(conn_list)
    while db_conn == False:
        conn_info = input('請正確輸入mysql數據庫鏈接信息(格式爲:主機IP,用戶名,登陸密碼,數據庫名),逗號分隔且輸入順序不能亂,例如:192.168.0.1,root,root,test_db:')
        conn_list = conn_info.split(',')
        if len(conn_list) != 4:
            continue
        dd_test = DataDict(conn_list)
        db_conn = dd_test.test_conn(conn_list)
    # 輸入數據表名稱
    table_s = input('請輸入數據庫表名(例如:t_order),如需輸入多個表名請用英文逗號分隔(例如:t_order,t_goods),結束使用請輸入q:')
    dd = DataDict(conn_list)
    while table_s != 'q':
        dd.run(table_s)
        table_s = input('繼續使用請輸入數據庫表名(例如t_order),如需輸入多個表名請用英文逗號分隔(例如t_order,t_goods),結束使用請輸入q):')
    else:
        print('謝謝使用,再見……')
        time.sleep(1)

最後

老規矩,你們有任何問題,均可以留言或者各類渠道告訴我,雖然我可能也不會去修改。方法和思路萬千,若是你有其餘思路以及想法的,歡迎留言分享和交流……

相關文章
相關標籤/搜索