MySQL集羣讀寫分離的自定義實現

基於MySQL Router能夠實現高可用,讀寫分離,負載均衡之類的,MySQL Router能夠說是很是輕量級的一箇中間件了。
看了一下MySQL Router的原理,其實並不複雜,原理也並不難理解,其實就是一個相似於VIP的代理功能,其中一個MySQL Router有兩個端口號,分別是對讀和寫的轉發。
至於選擇哪一個端口號,須要在申請鏈接的時候自定義選擇,換句話說就是在生成鏈接字符串的時候,要指明是讀操做仍是寫操做,而後由MySQL Router轉發到具體的服務器上。html

引用這裏的話說就是
通常來講,經過不一樣端口實現讀/寫分離,並不是好方法,最大的緣由是須要在應用程序代碼中指定這些鏈接端口。
可是,MySQL Router只能經過這種方式實現讀寫分離,因此MySQL Router拿來當玩具玩玩就好。其原理參考下圖,相關安裝配置等很是簡單mysql

 

其實暫不論「MySQL Router拿來當玩具玩玩就好」,相似須要本身指定端口(或者說指定讀寫)來實現讀寫分離這種方式,本身徹底能夠實現,又何須用一箇中間件呢?
對於MySQL Router來講,它本身自己又是單點的,還要考慮Router自身的高可用(解決了一個問題的同時又引入一個問題)。
很早以前就在想,可不能夠嘗試不借助中間件,也就無需關注中間件自身的高可用,本身實現讀寫分離呢?正則表達式


對於最簡單的master-salve複製的集羣方式的讀寫分離,
能夠集羣中的不一樣節點指定不一樣的優先級,把master服務器的優先級指定到最高,其他兩個指定成一個較低的優先級
對於應用程序發起的請求,須要指明是讀仍是寫,若是是寫操做,就指定到master上執行,若是是讀操做,就隨機地指向slave操做,徹底能夠在鏈接層就實現相似於MySQL Router的功能。
其實很是簡單,花不了多久就能夠實現相似這麼一個功能,在鏈接層實現讀寫分離,高可用,負載均衡,demo一個代碼實現。sql

 

 

以下簡單從數據庫鏈接層實現了讀寫分離以及負載均衡。
1,寫請求指向鏈接字符串中最高優先級的master,若是指定的最高優先級實例不可用,這裏假如是實現了故障轉移,依次尋找次優先級的實例
2,slave複製master的數據,讀請求隨機指向不一樣的slave,一旦某個slave不可用,繼續尋找其餘的slave
3,維護一個鏈接池,鏈接一概從鏈接池中獲取。數據庫

故障轉移能夠獨立實現,不須要在鏈接層作,鏈接層也不是作故障轉移的。這樣一旦發生故障,只要實現了故障轉移,應用程序端能夠不用作任何修改。服務器

# -*- coding: utf-8 -*-
import pymysql
import random
from DBUtils.PooledDB import PooledDB
import socket


class MySQLRouter:

    operation = None
    conn_list = []

    def __init__(self, *args, **kwargs):
        for k, v in kwargs.items():
            setattr(self, k, v)

    # 探測實例端口號
    @staticmethod
    def get_mysqlservice_status(host,port):
        mysql_stat = None
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        result = s.connect_ex((host, int(port)))
        # port os open
        if (result == 0):
            mysql_stat = 1
        return mysql_stat

    def get_connection(self):
        if not conn_list:
            raise("no config error")

        conn = None
        current_conn = None
        # 依據節點優先級排序
        self.conn_list.sort(key=lambda k: (k.get('priority', 0)))
        #寫或者未定義請求,一概指向高優先級的服務器,可讀寫
        if(self.operation.lower() == "write") or not self.operation:
            for conn in conn_list:
                # 若是最高優先級的主節點不可達,這裏假設成功實現了故障轉移,繼續找次優先級的實例。
                if self.get_mysqlservice_status(conn["host"], conn["port"]):
                    current_conn = conn
                    break
                else:
                    continue
        #讀請求隨機指向不一樣的slave
        elif(self.operation.lower() == "read"):
            #隨機獲取除了最該優先級節點以外的節點
            conn_read_list = conn_list[1:len(conn_list)]
            random.shuffle(conn_read_list)
            for conn in conn_read_list:
                #若是不可達,繼續尋找其餘除了主節點以外的節點
                if self.get_mysqlservice_status(conn["host"], conn["port"]):
                    current_conn = conn
                    break
                else:
                    continue
        try:
            #從鏈接池中獲取當前鏈接
            if (current_conn):
                pool = PooledDB(pymysql,20, host=current_conn["host"], port=current_conn["port"], user=current_conn["user"], password=current_conn["password"],db=current_conn["database"])
                conn = pool.connection()
        except:
            raise

        if not conn:
            raise("create connection error")

        return conn;


if __name__ == '__main__':

    #定義三個實例
    conn_1 = {'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': 'root',"database":"db01","priority":100}
    conn_2 = {'host': '127.0.0.1', 'port': 3307, 'user': 'root', 'password': 'root',"database":"db01","priority":200}
    conn_3 = {'host': '127.0.0.1', 'port': 3308, 'user': 'root', 'password': 'root',"database":"db01","priority":300}
    
    conn_list = []
    conn_list.append(conn_1)
    conn_list.append(conn_2)
    conn_list.append(conn_3)

    print("####execute update on master####")
    myrouter = MySQLRouter(conn_list=conn_list, operation="write")
    conn = myrouter.get_connection()
    cursor = conn.cursor()
    cursor.execute("update t01 set update_date = now() where id = 1")
    conn.commit()
    cursor.close()
    conn.close()

    print("####loop execute read on slave,query result####")
    #循環讀,判斷讀指向哪一個節點。
    for loop in range(10):
        myrouter = MySQLRouter(conn_list = conn_list,operation = "read")
        conn = myrouter.get_connection()
        cursor = conn.cursor()
        cursor.execute("SELECT id,cast(update_date as char), CONCAT('instance port is: ', CAST( @@PORT AS CHAR)) AS port FROM t01;")
        result = cursor.fetchone()
        print(result)
        cursor.close()
        conn.close()

這裏用過服務器的一個優先級,將寫請求指向最高優先級的master服務器,讀請求隨機指向非最高優先級的slave,
對於更新請求,都在master上執行,slave複製了master的數據,每次讀到的數據都不同,而且每次都請求的執行,基本上都隨機地指向了兩臺slave服務器
經過查詢返回一個端口號,來判斷讀請求是否平均分散到了不通的slave端。app

與「MySQL Router拿來當玩具玩玩就好」相比,這裏的實現同樣low,由於對數據的請求須要請求明確指定是讀仍是寫。負載均衡

不過,對於自動讀寫分離,無非是一個SQL語句執行的是的讀或寫判斷問題,並不是難事,這個須要解析請求的SQL是讀的仍是寫的問題。
某些數據庫中間件能夠實現自動的讀寫分離,可是要明白,對於那些支持自動讀寫分離的中間件,每每是要受到必定的約束的,好比不能用存儲過程什麼的,爲何呢?
仍是上面提到的SQL解析的問題,由於一旦使用了存儲過程,沒法解析出來這個SQL究竟是執行的是讀仍是寫,最起碼不是太直接。
對於SQL讀寫的判斷,也就是維護一個讀或者寫的枚舉正則表達式,非讀即寫,只是要格外關注這個讀寫的判斷的效率問題。dom

相關文章
相關標籤/搜索