導讀:html
做者:曾永偉,知數堂10期學員,多年JAVA物流行業開發管理經驗和PHP/Python跨境電商開發管理經驗,對數據庫系統情有獨鍾,善於運用SQL編程簡化業務邏輯,去年開始正式從業MySQL DBA, 專一於DB系統自動化運維、MySQL雲上實踐。本文爲python-mysql-binlogserver系列的第一篇(T1基礎篇)python
概述mysql
前不久知數堂吳老師在公開課上把MHA拉下神壇,是由於MHA沒法從根本上解決丟數據的可能性,只是嘗試性的去補償未同步的數據。使用MySQL的半同步複製能夠解決數據丟失的問題,但原生 io_thread
會破壞掉Master上的Binlog File的命名,對後繼的運維形成不便,因此使用原生加強半同步+blackhole引擎作binlog備份的方案几乎沒有人使用,而更多的公司則使用mysqlbinlog命令來實現輕量的BinlogServer,不足的是官方mysqlbinlog並不支持半同步複製,仍然會丟數據。git
據我所知,Facebook,Google和國內的美團公司都有研發本身的BinlogServer,可是目前我沒有找到一個開源的支持半同步的BinlogServer項目,因而就誕生了py-mysql-binlogserver這個項目。github
一、主要特性以下:sql
二、TODO特性數據庫
具體功能請到項目首頁進行查看和下載體驗。編程
三、項目地址爲:數組
https://github.com/alvinzane/...網絡
四、目錄結構:
py-mysql-binlogserver` ├── README.doc ├── doc │ ├── T1基礎篇-用Python開發MySQL加強半同步BinlogServer.md │ ├── T2通訊篇-用Python開發MySQL加強半同步BinlogServer.md │ ├── T3實戰篇-用Python開發MySQL加強半同步BinlogServer.md │ ├── T4架構篇-用Python開發MySQL加強半同步BinlogServer.md │ └── readme.md └── py_mysql_binlogserver ├── __init__.py ├── _playground # 練習場,隨便玩 │ ├── __init__.py │ ├── socket_client.py │ ├── socket_client_semi-repl.py │ └── test_slave.py ├── _tutorial # 教程實例代碼 │ ├── __init__.py │ ├── learn_bin1_charset.py │ ├── learn_bin2_binlog.py │ ├── learn_packet1_greeting.py │ ├── learn_packet2_auth.py │ ├── learn_packet3_query.py │ ├── learn_packet4_dump.py │ ├── learn_packet4_dump2.py │ ├── learn_packet5_dump_with_semi_ack.py │ ├── learn_socket1_client.py │ ├── learn_socket2_server.py │ ├── learn_socket3_server_mulit_thread.py │ └── learn_socket4_server_mulit_thread.py ├── binlogs # Binlog文件保存目錄 │ ├── mysql-bin.000014 │ ├── mysql-bin.gtid.index │ └── mysql-bin.index ├── cap ├── constants │ ├── EVENT_TYPE.py │ └── FIELD_TYPE.py ├── dump │ └── readme.md ├── packet │ ├── __init__.py │ ├── binlog_event.py │ ├── challenge.py │ ├── dump_gtid.py │ ├── dump_pos.py │ ├── event_header.py │ ├── gtid_event.py │ ├── query.py │ ├── response.py │ ├── semiack.py │ └── slave.py ├── protocol │ ├── Flags.py │ ├── __init__.py │ ├── err.py │ ├── gtid.py │ ├── ok.py │ ├── packet.py │ └── proto.py └── tests # 單元測試 │ ├── __init__.py │ └── test_packet.py ├── proxy.py # 簡單代理,用於觀察和保存MySQL Packet ├── server.py # 實現Master協議 ├── dumper.py # Binlog Dumper,至關於IO Thread ├── example.conf # 配置文件 └─── example.py # 同時啓動 Server&Dumper
假設你已經有了必定的Python編程基礎,而且徹底理解MySQL半同步複製的原理,接下來咱們就一步一步走進實現BinlogServer的技術細節。
二進制基礎複習
MySQL的Binlog文件是二進制的,MySQL在網絡上傳送的數據也是二進制的,因此咱們先來複習一下二進制的一些基礎知識。
一、數字的表示:
數字一般能夠用十進制,二進制和十六進制來表示。在計算機中,數據的運算、存儲、傳輸最終都會用到二進制,但因爲二進制不便於人類閱讀(太長了),因此咱們一般用一位十六進制來表示四個bite的二進制,即2位十六製表示一個Byte(字節)。
二進制 十六進制 十進制 0000 0001 01 1 0000 0010 02 2 ... 0000 1010 0A 10 1111 1111 FF 255
在Python中,用非零開頭的數字,表示十進制,0x,0b開頭分別表示十六進制和二進制:
# 數字10的三種表示: >>> print(10,0xa,0b1010) 10 10 10
十六進制和二進制與十進制轉換:
>>> print(hex(10),bin(10)) 0xa 0b1010 >>> print(int('0xa',16),int('0b1010',2)) 10 10
因爲1個字節(byte)最大能表示的數字爲255,因此更大的數字須要用多個字節來表示,如:
# 2個字節,16bit,最大爲 65535 >>> 0b1111111111111111` 65535 # 4個字節,32bit, 最大數(0x爲十六進制,1位十六進制等於4位二進制 0xf = 0b1111 >>> 0xffffffff` 4294967295
以上均爲無符號的數字,即全爲正數,對於有符號的正負數,則最高位的1個bit用0和1分別表示正數和負數。對於1個byte的數字,實際就只有7bit表示實際的數字,範圍爲[-128,127].
二、字符的表示
在計算機中全部的數據最終都要轉化爲數字,並且是二進制的數字。字符也不例外,也須要用到一個"映射表"來完成字符的表示。這個"映射表"叫做字符集,ASCII是最先最基礎的"單字節"字符集,它能夠表示鍵盤上全部的可打印字符,如52個大小寫字母及標點符號。
Python中,使用ord()和chr()完成ASCII字符與數字之間的轉換:
>>> ord('a'),ord('b'),ord('c') (97, 98, 99) >>> chr(97),chr(98),chr(99) ('a', 'b', 'c')
"單字節"最大爲數字是255,能表示的字符有限,因此後來就有了"多字節"字符集,如GBK,UTF8等等,用來表示更多的字符。其中UTF8是變長的字符編碼,用1-6個字節表示一個字符,能夠表示全世界全部的文字與符號,也叫萬國碼。
Python中,多字節字符與數字間的轉換:
# Python3中,字符對象(str), 可使用 .encode方法將字符轉爲bytes對象 >>> "中國".encode("utf8") b'\xe4\xb8\xad\xe5\x9b\xbd' >>> "中國".encode("gbk") b'\xd6\xd0\xb9\xfa' # bytes對象轉成字符 b'\xe4\xb8\xad\xe5\x9b\xbd'.decode("utf8") '中國'` `bytes([0xe4,0xb8,0xad,0xe5,0x9b,0xbd]).decode("utf8") '中國'`
使用hexdump查看文本文件的字符編碼:
$ file /tmp/python_chr.txt /tmp/python_chr.txt: UTF-8 Unicode text $ cat /tmp/python_chr.txt Python 中國 $ hexdump -C /tmp/python_chr.txt 00000000 50 79 74 68 6f 6e 0a e4 b8 ad e5 9b bd 0a |Python........| 0000000e
使用python來驗證編碼:
# 前三個字符 >>> chr(0x50),chr(0x79),chr(0x74) ('P', 'y', 't') # 剩下的字符你們動手試一試, 特別是漢字"中國"的編碼
Python二進制相關
一、bytes對象
bytes是Python3中新增的一個處理二進制"流"的對象。能夠下幾種方式咱們能夠獲得bytes對象:
一些簡單的例子:
>>> b'a' b'a' >>> type(b'a') <class 'bytes'> >>> bytes([97]) b'a' >>> bytes("中國",'utf8') b'\xe4\xb8\xad\xe5\x9b\xbd'
能夠把bytes看做是一個特殊的數組,由連續的字節(byte)組成,單字節最大數不能超過255,具備數組的切片,迭代等特性,它老是嘗試以ASCII編碼將數據轉成可顯示字符,超出ASCII可顯示範圍則使用\x打頭的二位十六進制進行顯示。
bytes對象的本質是存的二進制數組,存放的是0-255的數字數組,它只有結合"字符集"才能轉換正確的字符,或者要結合某種"協議"才能解讀出具體的"含義",這一點後面就會詳細的講到。
再來一個例子, 打印GBK編碼表:
# GBK編碼從0x8140 開始,顯示 30 行 for row in [0x8140 + x*16 for x in range(30)]: print(hex(row), end=" ") # 每行顯示16個 for i in range(16): high = row+i >> 8 & 0xff # 高位 low = row+i & 0xff # 低位 try: # 用bytes對象轉換成GBK字符 print(bytes([high, low]).decode("gbk"), end="") except: print(end=" ") print("")
輸出:
0x8140 丂丄丅丆丏丒丗丟丠両丣並丩丮丯丱 0x8150 丳丵丷丼乀乁乂乄乆乊乑乕乗乚乛乢 0x8160 乣乤乥乧乨乪乫乬乭乮乯乲乴乵乶乷 0x8170 乸乹乺乻乼乽乿亀亁亂亃亄亅亇亊 0x8180 亐亖亗亙亜亝亞亣亪亯亰亱亴亶亷嚲 0x8190 亹亼亽亾仈仌仏仐仒仚仛仜仠仢仦仧 0x81a0 仩仭仮仯仱仴仸仹仺仼仾伀伂伃伄伅 0x81b0 伆伇伈伋伌伒伓伔伕伖伜伝俥俔伨伩 0x81c0 伬伭伮伱伳伵伷伹伻伾伿佀佁佂佄佅 0x81d0 佇佈佉佊佋佌佒佔佖佡佢佦佨佪佫佭 0x81e0 佮佱佲併佷佸佹佺佽侀侁侂侅來侇侊 0x81f0 侌侎侐侒侓侕侖侘侙侚侜侞侟価侢
二、struct
計算機中幾乎全部的數據均可以最終抽象成數字和字符來表示,在C語言中用struct(結構體)來描述一個複雜的對象,經過這個結構能夠方便的將複雜對象轉換成二進制流用於存儲與網絡傳輸。Python中提供了struct模塊方便處理二進制流(bytes對象)與數字,字符對象的轉換功能。
三、用struct處理數字
>>> import struct # 單字節數字 >>> struct.pack("<B", 255) b'\xff' # 雙字節數字 >>> struct.pack("<H", 255) b'\xff\x00' # 四字節數字 >>> struct.pack("<I", 255) b'\xff\x00\x00\x00' # 八字節數字 >>> struct.pack("<Q", 255) b'\xff\x00\x00\x00\x00\x00\x00\x00' #unpack能夠找出8,4,2位符號整型的最大值 >>> struct.unpack(<Q",b'\xff\xff\xff\xff\xff\xff\xff\xff') (18446744073709551615,) >>> struct.unpack("<I",b'\xff\xff\xff\xff') (4294967295,) >>> struct.unpack("<H",b'\xff\xff') (65535,)
struct處理數字的要點有:
四、用struct處理字符串
字符轉換爲bytes:
# 變長字符串,以0xff結束 >>> struct.pack("<4s",b"cat") b'cat\x00' >>> struct.pack("<5s","中國".encode("gbk")) b'\xd6\xd0\xb9\xfa\x00' >>> struct.pack("<7s","中國".encode("utf8")) b'\xe4\xb8\xad\xe5\x9b\xbd\x00' # 定長字符串,第1個字節爲字符串的長度 >>> struct.pack("<4p",b"cat") b'\x03cat' >>> struct.pack("<5p","中國".encode("gbk")) b'\x04\xd6\xd0\xb9\xfa' >>> struct.pack("<7p","中國".encode("utf8")) b'\x06\xe4\xb8\xad\xe5\x9b\xbd'
bytes轉換爲字符:
# 僅取一例,其餘的請本身動手試一試 >>> struct.unpack("<7p", b'\x06\xe4\xb8\xad\xe5\x9b\xbd')[0].decode("utf8") '中國'
須要特別說明的是,unpack返回的是元組,哪怕是隻有一個元素,這樣作的好處是,咱們能夠按照規則將多個數據的format寫在一塊兒,讓代碼更加簡潔:
>>> struct.pack("<HBI6p",1, 19, 3306, b'alvin') b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin' >>> struct.unpack("<HBI6p", b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin') (1, 19, 3306, b'alvin') >>> id, no, port, name = struct.unpack("<HBI6p", b'\x01\x00\x13\xea\x0c\x00\x00\x05alvin') >>> id, no, port, name (1, 19, 3306, b'alvin')
這種寫法會大量應用到後繼的demo代碼中,請務必多加練習,並仔細閱讀官方文檔。
Python Socket編程
簡單說Socket編程,就是面向網絡傳輸層的接口編程,系統經過IP地址和端口號創建起兩臺電腦之間網絡鏈接,並提供兩個最基礎的通訊接口發送數據和接收數據,供開發者調用,先來看一個最簡單的客戶端Socket例子:
import socket # 建立一個socket對象 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 創建鏈接 s.connect(("192.168.1.101", 3306)) # 接收數據 buf = s.recv(10240) print(type(buf)) # <class 'bytes'> # 發送數據 s.send(b'hello')
能夠看出經過socket接收和發送的數據都是前面講的bytes對象,由於bytes對象自己只是一個二進制流,因此在沒有"協議"的前提下,咱們是沒法理解傳輸內容的具體含義。常見的http,https,ftp,smtp,ssh協議都是創建socket通訊之上的協議。換句說,就是通socket編程能夠實現與現有的任何協議進行通訊。若是你熟悉了ssh協議,那麼實現ssh端口掃描程序就易如反掌了。
用socket不只能夠和其它協議的服務端進行通訊,並且能夠實現socket服務端,監聽和處理來自client的鏈接和數據。
import socket # 建立一個socket對象 s = socket.socket() # 監聽端口 s.bind(('127.0.0.1', 8000)) s.listen(5) while True: conn, addr = s.accept() conn.send(bytes('Welcome python socket server.', 'utf8')) # 關閉連接 conn.close()
經過上面兩個簡單的例子,相信你們對Python的socket編程已經有一個初步的認識,那就是"至關的簡單",沒有想象中那麼複雜。
接下再來看一個多線程版的SocketServer, 能夠經過telnet來實現一個網絡計算器:
# learn_socket3_server_mulit_thread.py import threading import socketserver class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): def handle(self): """ 網絡計算器,返回表達式的值 """ while True: try: # 接收表達式數據 data = str(self.request.recv(1024), 'ascii').strip() if "q" in data: self.finish() break # 計算結果 response = bytes("{} = {}\r\n".format(data, eval(data)), 'ascii') print(response.decode("ascii").strip()) # 返回結果 self.request.sendall(response) except: self.request.sendall(bytes("\n", 'ascii')) class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass if __name__ == "__main__": server = ThreadedTCPServer(("127.0.0.1", 9000), ThreadedTCPRequestHandler) ip, port = server.server_address server_thread = threading.Thread(target=server.serve_forever) print(f"Calculator Server start at {ip} : {port}") server_thread.start()
使用telnet進行測試:
$ telnet 127.0.0.1 9000 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. 12345679*81 # 按回車 12345679*81 = 999999999 # 返回結果 3+2-5*0 # Enter 3+2-5*0 = 5 (123+123)*123 # Enter (123+123)*123 = 30258 quit # Enter
服務端日誌:
Calculator Server start at 127.0.0.1 : 9000 12345679*81 = 999999999 3+2-5*0 = 5 (123+123)*123 = 30258
小結
理解二進制,字符/編碼,socket通訊,以及如何使用Python來處理它們,是實現BinlogServer最重要的基礎,因爲篇幅問題,不少知識點只能點到爲止,雖然很基礎,可是仍是須要本身的動手去實驗,觸類旁通地多實踐本身的想法,會對理解後面的文章大有幫助。
只有會認真看文檔的DBA纔是好DBA,只會認真看代碼的Engineer,必定不是好Engineer。代碼必定要運行起來,On Runtime纔會有價值,纔會讓你變成好Engineer.
最後,祝你編碼快樂〜
相關文檔
https://docs.python.org/3/lib...
https://docs.python.org/3/lib...
附:基於mysqlbinlog命令的BinlogServer簡單實現
#!/bin/sh REMOTE_HOST={{host}} REMOTE_PORT={{mysql_port}} REMOTE_USER={{mysql_repl_user}} REMOTE_PASS={{mysql_repl_password}} BACKUP_BIN=/usr/local/mysql/bin/mysqlbinlog LOCAL_BACKUP_DIR=/data/backup/mysql/binlog_3306 BACKUP_LOG=/data/backup/mysql/binlog_3306/backup_3306.log FIRST_BINLOG=mysql-bin.000001 #time to wait before reconnecting after failure SLEEP_SECONDS=10 ##create local_backup_dir if necessary mkdir -p ${LOCAL_BACKUP_DIR} cd ${LOCAL_BACKUP_DIR} ## Function while loop , After the connection is disconnected, wait for the specified time. , Reconnect while : do `if [ `ls -A "${LOCAL_BACKUP_DIR}" |wc -l` -eq 0 ];then` LAST_FILE=${FIRST_BINLOG} else `LAST_FILE=`ls -l ${LOCAL_BACKUP_DIR} | grep -v backuplog |tail -n 1 |awk '{print $9}'` ` fi ${BACKUP_BIN} --raw --read-from-remote-server --stop-never --host=${REMOTE_HOST} --port=${REMOTE_PORT} --user=${REMOTE_USER} --password=${REMOTE_PASS} ${LAST_FILE} `echo "`date +"%Y/%m/%d %H:%M:%S"` mysqlbinlog Stop it , Return code :$?" | tee -a ${BACKUP_LOG}` echo "${SLEEP_SECONDS} After the second connect and continue to backup " | tee -a ${BACKUP_LOG} sleep ${SLEEP_SECONDS} done