python 獲取mysql數據庫列表以及用戶權限

1、需求分析

須要統計出當前數據庫的全部數據庫名,以及每一個用戶的受權信息。python

 

獲取全部數據庫

在mysql裏面,使用命令:mysql

show databases

 

就能夠獲取全部數據庫了sql

 

獲取全部用戶

執行命令:數據庫

select User from mysql.user

 

注意:須要排除到默認的用戶,好比:json

"root", "mysql.sys", "mysql.session"

 

獲取用戶權限

語法:session

show grants for 用戶名;

 

好比:app

show grants for test;

執行輸出:ide

GRANT USAGE ON *.* TO 'test'@'%'
GRANT SELECT ON `DB01`.* TO 'test'@'%'
GRANT ALL PRIVILEGES ON `DB02`.* TO 'test'@'%' WITH GRANT OPTION

 

注意:這段信息表示,test用戶,對DB01數據庫下的全部表,是隻讀權限。fetch

對對DB02數據庫下的全部表,擁有讀寫權限。spa

 

2、python實現

因爲時間關係,這裏不一一解釋了,代碼裏面都有註釋。

完整代碼以下:

#!/usr/bin/env python
# coding: utf-8

import json
import pymysql


class Mysql(object):
    # mysql 端口號,注意:必須是int類型
    def __init__(self, host, user, passwd, port, db_name):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.port = port
        self.db_name = db_name

    def select(self, sql):
        """
        執行sql命令
        :param sql: sql語句
        :return: 元祖
        """
        try:
            conn = pymysql.connect(
                host=self.host,
                user=self.user,
                passwd=self.passwd,
                port=self.port,
                database=self.db_name,
                charset='utf8',
                cursorclass=pymysql.cursors.DictCursor
            )
            cur = conn.cursor()  # 建立遊標
            # conn.cursor()
            cur.execute(sql)  # 執行sql命令
            res = cur.fetchall()  # 獲取執行的返回結果
            cur.close()
            conn.close()
            return res
        except Exception as e:
            print(e)
            return False

    def get_all_db(self):
        """
        獲取全部數據庫名
        :return: list
        """
        # 排除自帶的數據庫
        exclude_list = ["sys", "information_schema", "mysql", "performance_schema"]
        sql = "show databases"  # 顯示全部數據庫
        res = self.select(sql)
        # print(res)
        if not res:  # 判斷結果非空
            return False

        db_list = []  # 數據庫列表
        for i in res:
            db_name = i['Database']
            # 判斷不在排除列表時
            if db_name not in exclude_list:
                db_list.append(db_name)
                # print(db_name)

        if not db_list:
            return False

        return db_list

    def get_user_list(self):
        """
        獲取用戶列表
        :return: list
        """
        # 排除自帶的用戶
        exclude_list = ["root", "mysql.sys", "mysql.session"]
        sql = "select User from mysql.user"
        res = self.select(sql)
        # print(res)
        if not res:  # 判斷結果非空
            return False

        user_list = []
        for i in res:
            db_name = i['User']
            # 判斷不在排除列表時
            if db_name not in exclude_list:
                user_list.append(db_name)

        if not user_list:
            return False

        return user_list

    def get_user_power(self):
        """
        獲取用戶權限
        :return: {}

        {
            "test":{  # 用戶名
                "read":["db1","db2"],  # 只擁有讀取權限的數據庫
                "all":["db1","db2"],  # 擁有讀寫權限的數據庫
            },
            ...
        }
        """
        info_dict = {}  # 最終結果字典
        # 獲取用戶列表
        user_list = self.get_user_list()
        if not user_list:
            return False

        # 查詢每個用戶的權限
        for user in user_list:
            # print("user",user)
            sql = "show grants for {}".format(user)
            res = self.select(sql)
            if not res:
                return False

            for i in res:
                key = 'Grants for {}@%'.format(user)
                # print("key",key)
                # 判斷key值存在時
                if i.get(key):
                    # print(i[key])
                    # 包含ALL或者SELECT時
                    if "ALL" in i[key] or "SELECT" in i[key]:
                        # print(i[key])
                        if not info_dict.get(user):
                            info_dict[user] = {"read": [], "all": []}

                        cut_str = i[key].split()  # 空格切割
                        # print(cut_str,len(cut_str))
                        power = cut_str[1]  # 權限,好比ALL,SELECT

                        if len(cut_str) == 6:  # 判斷切割長度
                            # 去除左邊的`
                            tmp_str = cut_str[3].lstrip("`")
                        else:
                            tmp_str = cut_str[4].lstrip("`")

                        # 替換字符串
                        tmp_str = tmp_str.replace('`.*', '')
                        value = tmp_str.replace('\_', '-')

                        # 判斷權限爲select 時
                        if power.lower() == "select":
                            if value not in info_dict[user].get("read"):
                                # 只讀列表
                                info_dict[user]["read"].append(value)
                        else:
                            if value not in info_dict[user].get("all"):
                                # 全部權限列表
                                info_dict[user]["all"].append(value)

        # print(info_dict)
        return info_dict


if __name__ == '__main__':
    host = "192.168.10.10"
    user = "root"
    passwd = "123456"
    port = 3306
    db_name = "mysql"

    obj = Mysql(host, user, passwd, port, db_name)
    all_db_list = obj.get_all_db()
    user_power = obj.get_user_power()

    print("all_db_list",all_db_list)
    print("user_power",user_power)
View Code
相關文章
相關標籤/搜索