須要統計出當前數據庫的全部數據庫名,以及每一個用戶的受權信息。python
在mysql裏面,使用命令:mysql
show databases
就能夠獲取全部數據庫了sql
執行命令:數據庫
select User from mysql.user
注意:須要排除到默認的用戶,好比:json
"root", "mysql.sys", "mysql.session"
語法:session
show grants for 用戶名;
好比:app
show grants for test;
執行輸出:ide
GRANT USAGE ON *.* TO 'test'@'%' GRANT SELECT ON `DB01`.* TO 'test'@'%' GRANT ALL PRIVILEGES ON `DB02`.* TO 'test'@'%' WITH GRANT OPTION
注意:這段信息表示,test用戶,對DB01數據庫下的全部表,是隻讀權限。fetch
對對DB02數據庫下的全部表,擁有讀寫權限。spa
因爲時間關係,這裏不一一解釋了,代碼裏面都有註釋。
完整代碼以下:
#!/usr/bin/env python # coding: utf-8 import json import pymysql class Mysql(object): # mysql 端口號,注意:必須是int類型 def __init__(self, host, user, passwd, port, db_name): self.host = host self.user = user self.passwd = passwd self.port = port self.db_name = db_name def select(self, sql): """ 執行sql命令 :param sql: sql語句 :return: 元祖 """ try: conn = pymysql.connect( host=self.host, user=self.user, passwd=self.passwd, port=self.port, database=self.db_name, charset='utf8', cursorclass=pymysql.cursors.DictCursor ) cur = conn.cursor() # 建立遊標 # conn.cursor() cur.execute(sql) # 執行sql命令 res = cur.fetchall() # 獲取執行的返回結果 cur.close() conn.close() return res except Exception as e: print(e) return False def get_all_db(self): """ 獲取全部數據庫名 :return: list """ # 排除自帶的數據庫 exclude_list = ["sys", "information_schema", "mysql", "performance_schema"] sql = "show databases" # 顯示全部數據庫 res = self.select(sql) # print(res) if not res: # 判斷結果非空 return False db_list = [] # 數據庫列表 for i in res: db_name = i['Database'] # 判斷不在排除列表時 if db_name not in exclude_list: db_list.append(db_name) # print(db_name) if not db_list: return False return db_list def get_user_list(self): """ 獲取用戶列表 :return: list """ # 排除自帶的用戶 exclude_list = ["root", "mysql.sys", "mysql.session"] sql = "select User from mysql.user" res = self.select(sql) # print(res) if not res: # 判斷結果非空 return False user_list = [] for i in res: db_name = i['User'] # 判斷不在排除列表時 if db_name not in exclude_list: user_list.append(db_name) if not user_list: return False return user_list def get_user_power(self): """ 獲取用戶權限 :return: {} { "test":{ # 用戶名 "read":["db1","db2"], # 只擁有讀取權限的數據庫 "all":["db1","db2"], # 擁有讀寫權限的數據庫 }, ... } """ info_dict = {} # 最終結果字典 # 獲取用戶列表 user_list = self.get_user_list() if not user_list: return False # 查詢每個用戶的權限 for user in user_list: # print("user",user) sql = "show grants for {}".format(user) res = self.select(sql) if not res: return False for i in res: key = 'Grants for {}@%'.format(user) # print("key",key) # 判斷key值存在時 if i.get(key): # print(i[key]) # 包含ALL或者SELECT時 if "ALL" in i[key] or "SELECT" in i[key]: # print(i[key]) if not info_dict.get(user): info_dict[user] = {"read": [], "all": []} cut_str = i[key].split() # 空格切割 # print(cut_str,len(cut_str)) power = cut_str[1] # 權限,好比ALL,SELECT if len(cut_str) == 6: # 判斷切割長度 # 去除左邊的` tmp_str = cut_str[3].lstrip("`") else: tmp_str = cut_str[4].lstrip("`") # 替換字符串 tmp_str = tmp_str.replace('`.*', '') value = tmp_str.replace('\_', '-') # 判斷權限爲select 時 if power.lower() == "select": if value not in info_dict[user].get("read"): # 只讀列表 info_dict[user]["read"].append(value) else: if value not in info_dict[user].get("all"): # 全部權限列表 info_dict[user]["all"].append(value) # print(info_dict) return info_dict if __name__ == '__main__': host = "192.168.10.10" user = "root" passwd = "123456" port = 3306 db_name = "mysql" obj = Mysql(host, user, passwd, port, db_name) all_db_list = obj.get_all_db() user_power = obj.get_user_power() print("all_db_list",all_db_list) print("user_power",user_power)