/*
the waiting game:儘管人生如此艱難,不要放棄;不要妥協;不要失去但願
*/mysql
隨着MySQL MGR的版本的升級以及技術成熟,在把MHA拉下神壇以後, MGR愈來愈成爲MySQL高可用的首選方案。
MGR的搭建並不算很複雜,可是有一系列手工操做步驟,爲了簡便MGR的搭建和故障診斷,這裏完成了一個自動化的腳本,來實現MGR的自動化搭建,自動化故障診斷以及修復。sql
MGR自動化搭建
爲了簡便起見,這裏以單機多實例的模式進行測試,
先裝好三個MySQL實例,端口號分別是7001,7002,7003,其中7001做爲寫節點,其他兩個節點做爲讀節,8000節點是筆者的另一個測試節點,請忽略。
在指明主從節點的狀況下,以下爲mgr_tool.py一鍵搭建MGR集羣的測試demobootstrap
MGR故障模擬1ide
MGR節點故障自動監測和自愈實現,以下是搭建完成後的MGR集羣,目前集羣處於徹底正常的狀態中。測試
主觀形成主從節點間binlog的丟失fetch
在主節點上對於對於從節點丟失的數據操做,GTID沒法找到對應的數據,組複製立馬熄火ui
非寫入節點出現錯誤spa
看下errorlog3d
若是是手動解決的話,仍是GTID跳過錯誤事物的套路,master上的GTID信息rest
嘗試跳過最新的一個事物ID,而後從新鏈接到組,能夠正常鏈接到組,另一個節點仍舊處於error狀態
另一個節點相似,依次解決。
MGR故障模擬2
從節點脫離Group
這種狀況卻是比較簡單,從新開始組複製便可,start group_replication
MGR故障自動檢測和修復
對於如上的兩種狀況,
1,若是是從節點丟失主節點的事物,嘗試在從節點上跳過GTID,從新開始複製便可
2,若是是從節點非丟失主節點事物,嘗試在從節點從新開始組複製便可
實現代碼以下
def auto_fix_mgr_error(conn_master_dict,conn_slave_dict): group_replication_status = get_group_replication_status(conn_slave_dict) if(group_replication_status[0]["MEMBER_STATE"]=="ERROR" or group_replication_status[0]["MEMBER_STATE"] == "OFFLINE"): print(conn_slave_dict["host"]+str(conn_slave_dict["port"])+'------>'+group_replication_status[0]["MEMBER_STATE"]) print("auto fixing......") while 1 > 0: master_gtid_list = get_gtid(conn_master_dict) slave_gtid_list = get_gtid(conn_slave_dict) master_executed_gtid_value = int((master_gtid_list[-1]["Executed_Gtid_Set"]).split("-")[-1]) slave_executed_gtid_value = int(slave_gtid_list[-1]["Executed_Gtid_Set"].split("-")[-1]) slave_executed_gtid_prefix = slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[0] slave_executed_skiped_gtid = slave_executed_gtid_value + 1 if (master_executed_gtid_value > slave_executed_gtid_value): print("skip gtid and restart group replication,skiped gtid is " + slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[-1].split("-")[0] + ":"+str(slave_executed_skiped_gtid)) slave_executed_skiped_gtid = slave_executed_gtid_prefix+":"+str(slave_executed_skiped_gtid) skip_gtid_on_slave(conn_slave_dict,slave_executed_skiped_gtid) time.sleep(10) start_group_replication(conn_slave_dict) if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"): print("mgr cluster fixed,back to normal") break else: start_group_replication(conn_slave_dict) if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"): print("mgr cluster fixed,back to normal") break elif (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): print("mgr cluster is normal,nothing to do") check_replication_group_members(conn_slave_dict)
對於故障類型1,GTID事物不一致的自動化修復
對於故障類型2從節點offline的自動化修復
完整的實現代碼
該過程要求MySQL實例必須知足MGR的基本條件,若是環境自己沒法知足MGR,一切都無從談起,所以要很是清楚MGR環境的最基本要求
完成的實現代碼以下,花了一個下午寫的,目前來講存在如下不足
1,建立複製用戶的時候,沒有指定具體的slave機器,目前直接指定的%:create user repl@'%' identified by repl
2,對於slave的修復,目前沒法總體修復,只能一臺一臺修復,其實就是少了一個循環slave機器判斷的過程
3,目前搭建以前都會reset master(無論主從,主要是清理可能的殘留GTID),所以只適合新環境的搭建
4,目前只支持offline和gtid事物衝突的錯誤類型修復,沒法支持其餘MGR錯誤類型的修復
5,開發環境是單機多實例模式測試,沒有在多機單實例模式下充分測試
以上都會逐步改善&增強。
# -*- coding: utf-8 -*- import pymysql import logging import time import decimal def execute_query(conn_dict,sql): conn = pymysql.connect(host=conn_dict['host'], port=conn_dict['port'], user=conn_dict['user'], passwd=conn_dict['password'], db=conn_dict['db']) cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute(sql) list = cursor.fetchall() cursor.close() conn.close() return list def execute_noquery(conn_dict,sql): conn = pymysql.connect(host=conn_dict['host'], port=conn_dict['port'], user=conn_dict['user'], passwd=conn_dict['password'], db=conn_dict['db']) cursor = conn.cursor() cursor.execute(sql) conn.commit() cursor.close() conn.close() return list def get_gtid(conn_dict): sql = "show master status;" list = execute_query(conn_dict,sql) return list def skip_gtid_on_slave(conn_dict,gtid): sql_1 = 'stop group_replication;' sql_2 = '''set gtid_next='{0}';'''.format(gtid) sql_3 = 'begin;' sql_4 = 'commit;' sql_5 = '''set gtid_next='automatic';''' try: execute_noquery(conn_dict, sql_1) execute_noquery(conn_dict, sql_2) execute_noquery(conn_dict, sql_3) execute_noquery(conn_dict, sql_4) execute_noquery(conn_dict, sql_5) except: raise def get_group_replication_status(conn_dict): sql = '''select MEMBER_STATE from performance_schema.replication_group_members where (MEMBER_HOST = '{0}' or ifnull(MEMBER_HOST,'') = '') AND (MEMBER_PORT={1} or ifnull(MEMBER_PORT,'') ='') ; '''.format(conn_dict["host"], conn_dict["port"]) result = execute_query(conn_dict,sql) if result: return result else: return None def check_replication_group_members(conn_dict): print('-------------------------------------------------------') result = execute_query(conn_dict, " select * from performance_schema.replication_group_members; ") if result: column = result[0].keys() current_row = '' for key in column: current_row += str(key) + " " print(current_row) for row in result: current_row = '' for key in row.values(): current_row += str(key) + " " print(current_row) print('-------------------------------------------------------') def auto_fix_mgr_error(conn_master_dict,conn_slave_dict): group_replication_status = get_group_replication_status(conn_slave_dict) if(group_replication_status[0]["MEMBER_STATE"]=="ERROR" or group_replication_status[0]["MEMBER_STATE"] == "OFFLINE"): print(conn_slave_dict["host"]+str(conn_slave_dict["port"])+'------>'+group_replication_status[0]["MEMBER_STATE"]) print("auto fixing......") while 1 > 0: master_gtid_list = get_gtid(conn_master_dict) slave_gtid_list = get_gtid(conn_slave_dict) master_executed_gtid_value = int((master_gtid_list[-1]["Executed_Gtid_Set"]).split("-")[-1]) slave_executed_gtid_value = int(slave_gtid_list[-1]["Executed_Gtid_Set"].split("-")[-1]) slave_executed_gtid_prefix = slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[0] slave_executed_skiped_gtid = slave_executed_gtid_value + 1 if (master_executed_gtid_value > slave_executed_gtid_value): print("skip gtid and restart group replication,skiped gtid is " + slave_gtid_list[-1]["Executed_Gtid_Set"].split(":")[-1].split("-")[0] + ":"+str(slave_executed_skiped_gtid)) slave_executed_skiped_gtid = slave_executed_gtid_prefix+":"+str(slave_executed_skiped_gtid) skip_gtid_on_slave(conn_slave_dict,slave_executed_skiped_gtid) time.sleep(10) start_group_replication(conn_slave_dict) if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"): print("mgr cluster fixed,back to normal") break else: start_group_replication(conn_slave_dict) if(get_group_replication_status(conn_slave_dict)[0]["MEMBER_STATE"]=="ONLINE"): print("mgr cluster fixed,back to normal") break elif (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): print("mgr cluster is normal,nothing to do") check_replication_group_members(conn_slave_dict) ''' reset master ''' def reset_master(conn_dict): try: execute_noquery(conn_dict, "reset master;") except: raise def install_group_replication_plugin(conn_dict): get_plugin_sql = "SELECT name,dl FROM mysql.plugin WHERE name = 'group_replication';" install_plugin_sql = '''install plugin group_replication soname 'group_replication.so'; ''' try: result = execute_query(conn_dict, get_plugin_sql) if not result: execute_noquery(conn_dict, install_plugin_sql) except: raise def create_mgr_repl_user(conn_master_dict,user,password): try: reset_master(conn_master_dict) sql_exists_user = '''select user from mysql.user where user = '{0}'; '''.format(user) user_list = execute_query(conn_master_dict,sql_exists_user) if not user_list: create_user_sql = '''create user {0}@'%' identified by '{1}'; '''.format(user,password) grant_privilege_sql = '''grant replication slave on *.* to {0}@'%';'''.format(user) execute_noquery(conn_master_dict,create_user_sql) execute_noquery(conn_master_dict, grant_privilege_sql) execute_noquery(conn_master_dict, "flush privileges;") except: raise def set_super_read_only_off(conn_dict): super_read_only_off = '''set global super_read_only = 0;''' execute_noquery(conn_dict, super_read_only_off) def open_group_replication_bootstrap_group(conn_dict): sql = '''select variable_name,variable_value from performance_schema.global_variables where variable_name = 'group_replication_bootstrap_group';''' result = execute_query(conn_dict, sql) open_bootstrap_group_sql = '''set @@global.group_replication_bootstrap_group=on;''' if result and result[0]['variable_value']=="OFF": execute_noquery(conn_dict, open_bootstrap_group_sql) def close_group_replication_bootstrap_group(conn_dict): sql = '''select variable_name,variable_value from performance_schema.global_variables where variable_name = 'group_replication_bootstrap_group';''' result = execute_query(conn_dict, sql) close_bootstrap_group_sql = '''set @@global.group_replication_bootstrap_group=off;''' if result and result[0]['variable_value'] == "ON": execute_noquery(conn_dict, close_bootstrap_group_sql) def start_group_replication(conn_dict): start_group_replication = '''start group_replication;''' group_replication_status = get_group_replication_status(conn_dict) if not (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): execute_noquery(conn_dict, start_group_replication) def connect_to_group(conn_dict,repl_user,repl_password): connect_to_group_sql = '''change master to master_user='{0}', master_password='{1}' for channel 'group_replication_recovery'; '''.format(repl_user,repl_password) try: execute_noquery(conn_dict, connect_to_group_sql) except: raise def start_mgr_on_master(conn_master_dict,repl_user,repl_password): try: set_super_read_only_off(conn_master_dict) reset_master(conn_master_dict) create_mgr_repl_user(conn_master_dict,repl_user,repl_password) connect_to_group(conn_master_dict,repl_user,repl_password) open_group_replication_bootstrap_group(conn_master_dict) start_group_replication(conn_master_dict) close_group_replication_bootstrap_group(conn_master_dict) group_replication_status = get_group_replication_status(conn_master_dict) if (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): print("master added in mgr and run successfully") return True except: raise print("############start master mgr error################") exit(1) def start_mgr_on_slave(conn_slave_dict,repl_user,repl_password): try: set_super_read_only_off(conn_slave_dict) reset_master(conn_slave_dict) connect_to_group(conn_slave_dict,repl_user,repl_password) start_group_replication(conn_slave_dict) # wait for 10 time.sleep(10) # then check mgr status group_replication_status = get_group_replication_status(conn_slave_dict) if (group_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): print("slave added in mgr and run successfully") if (group_replication_status[0]['MEMBER_STATE'] == 'RECOVERING'): print("slave is recovering") except: print("############start slave mgr error################") exit(1) def auto_mgr(conn_master,conn_slave_1,conn_slave_2,repl_user,repl_password): install_group_replication_plugin(conn_master) master_replication_status = get_group_replication_status(conn_master) if not (master_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): start_mgr_on_master(conn_master,repl_user,repl_password) slave1_replication_status = get_group_replication_status(conn_slave_1) if not (slave1_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): install_group_replication_plugin(conn_slave_1) start_mgr_on_slave(conn_slave_1, repl_user, repl_user) slave2_replication_status = get_group_replication_status(conn_slave_2) if not (slave2_replication_status[0]['MEMBER_STATE'] == 'ONLINE'): install_group_replication_plugin(conn_slave_2) start_mgr_on_slave(conn_slave_2, repl_user, repl_user) check_replication_group_members(conn_master) if __name__ == '__main__': conn_master = {'host': '127.0.0.1', 'port': 7001, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'} conn_slave_1 = {'host': '127.0.0.1', 'port': 7002, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'} conn_slave_2 = {'host': '127.0.0.1', 'port': 7003, 'user': 'root', 'password': 'root', 'db': 'mysql', 'charset': 'utf8mb4'} repl_user = "repl" repl_password = "repl" #auto_mgr(conn_master,conn_slave_1,conn_slave_2,repl_user,repl_password) auto_fix_mgr_error(conn_master,conn_slave_1) check_replication_group_members(conn_master)