原因python
平常工做中常常遇到相似的問題:把某個服務器上的某些指定的表同步到另一臺服務器。
相似需求用SSIS或者其餘ETL工做很容易實現,好比用SSIS的話就能夠,但會存在至關一部分反覆的手工操做。
建源的數據庫信息,目標的數據庫信息,若是是多個表,須要一個一個地拉source和target,而後一個一個地mapping,而後運行實現數據同步。
而後極可能,這個workflow使用也就這麼一次,就壽終正寢了,卻同樣要浪費時間去作這個ETL。sql
快速數據同步實現數據庫
因而在想,可不可能快速實現相似需求,盡最大程度減小重複的手工操做?相似基於命令行的方式,簡單快捷,不須要太多的手動操做。
因此就有了本文,基於Python(目的是順便熟悉一下Python的語法),快速實現SQL Server的數據庫之間的數據同步操做,
後面又稍微擴展了一下,能夠實現不一樣服務器的數據庫之間的表結構,表對應的數據,存儲過程,函數,用戶自定義類型表(user define table type)的同步
目前支持在兩個SQL Server數據源之間:每次同步一張或者多張表/存儲過程,也能夠同步整個數據庫的全部表/存儲過程(以及表/存儲過程依賴的其餘數據庫對象)。
支持sqlserver2012及以上版本windows
須要考慮到一些基本的校驗問題:在源服務器上,須要同步的對象是否存在,或者輸入的對象是否存在於源服務器的數據庫裏,是否強制覆蓋target對象等等。安全
在目標服務器上,對於表的同步:
1,表的存在依賴於schema,須要考慮到表的schema是否存在,若是不存在先在target庫上建立表對應的schema
2,target表中是否有數據?若是有數據,是否以覆蓋的方式執行
對於存儲過程的同步:
1,相似於表,須要考慮存儲過程的schema是否存在,若是不存在先在target庫上建立表對應的schema
2,相似於表,target數據庫中是否已經存在對應的存儲過程,是否以覆蓋的方式執行
3,存儲過程可能依賴於b表,某些函數,用戶自定義表變量等等,同步存儲過程的時候須要先同步依賴的對象,這一點比較複雜,實現過程當中遇到在不少不少的坑
可能存在對象A依賴於對象B,對象B依賴於對象C……,這裏有點遞歸的意思
這一點致使了重構大量的代碼,一開始都是直來直去的同步,沒法實現這個邏輯,切實體會到代碼的「單一職責」原則服務器
參數說明
參數說明以下,大的包括四類:
1,源服務器信息 (服務器地址,實例名,數據庫名稱,用戶名,密碼),沒有用戶名密碼的狀況下,使用windows身份認證模式
2,目標服務器信息(服務器地址,實例名,數據庫名稱,用戶名,密碼),沒有用戶名密碼的狀況下,使用windows身份認證模式
3,同步的對象類型以及對象
4,同步的對象在目標服務器上存在的狀況下,是否強制覆蓋網絡
其實在同步數據的時候,也能夠把須要同步的行數提取出來作參數,比較簡單,這裏暫時沒有作。
好比須要快速搭建一個測試環境,須要同步全部的表結構和每一個表的一部分數據便可。app
表以及數據同步ide
表同步的原理是,建立目標表,遍歷源數據的表,生成insert into values(***),(***),(***)格式的sql,而後插入目標數據庫,這裏大概步驟以下:
1,表依賴於schema,因此同步表以前先同步schema
2,強制覆蓋的狀況下,會drop掉目標表(若是存在的話),防止目標表與源表結構不一致,非強制覆蓋的狀況下,若是字段不一致,則拋出異常
3,同步表結構,包括字段,索引,約束等等,可是沒法支持外鍵,刻意去掉了外鍵,想一想爲何?因吹斯汀。
4,須要篩選出來非計算列字段,insert語句只能是非計算列字段(又致使重構了部分代碼)
5,轉義處理,在拼湊SQL的時候,須要進行轉義處理,不然會致使SQL語句錯誤,目前處理了字符串中的’字符,二進制字段,時間字段的轉義處理(最容易發生問題的地方)
6,鑑於insert into values(***),(***),(***)語法上容許的最大值是1000,所以每生成1000條數據,就同步一次
7,自增列的identity_insert 標識打開與關閉處理函數
使用以下參數,同步源數據庫的三張表到目標數據庫,由於這裏是在本機命名實例下測試,所以實例名和端口號輸入
python SyncDatabaseObject.py -s_h="127.0.0.1" -s_i="sql2017" -s_P=49744 -s_d=DB01 -t_h=127.0.0.1 -t_i="sql2017" -t_P=49744 -t_d="DB02" -obj_type="tab" -obj="[dbo].[table01],schema1.table01,schema2.table01" -f="Y"
執行同步的效果
若是是非強制覆蓋的方式執行,也即安全模式下,會產生較多的警告
1,若是target上表存在,則不一樣步表結構,給出警告信息。
2,若是target上(已經存在的表中)對應的表中有數據,則退出數據同步,給出警告信息,目的是爲了確保數據安全,防止數據丟失(被覆蓋)。
3,若是同步的對象不存在,給出警告信息(屬於另一種警告信息)
說明:
1,若是輸入obj_type="tab" 且-obj=爲None的狀況下,會同步源數據庫中的全部表。
2,這個效率取決於機器性能和網絡傳輸,本機測試的話,每秒中能夠提交3到4次,也就是每秒鐘能夠提交3000~4000行左右的數據。
已知的問題:
1,當表的索引爲filter index的時候,沒法生成包含where條件的索引建立語句,那個看起來蛋疼的表結構導出語句,暫時沒時間改它。
2,暫時不支持其餘少用的類型字段,好比地理空間字段什麼的。
存儲過程對象的同步
存儲過程同步的原理是,在源數據庫上生成建立存儲過程的語句,而後寫入目標庫,這裏大概步驟以下:
1,存儲過程依賴於schema,因此同步存儲過程以前先同步schema(同表)
2,同步的過程會檢查依賴對象,若是依賴其餘對象,暫停當前對象同步,先同步依賴對象
3,重複第二步驟,直至完成
4,對於存儲過程的同步,若是是強制覆蓋的話,強制覆蓋僅僅對存儲過程本身生效(刪除&重建),對依賴對象並不生效,若是依賴對象不存在,就建立,不然不作任何事情
使用以下參數,同步源數據庫的兩個存儲過程到目標數據庫,由於這裏是在本機命名實例下測試,所以實例名和端口號輸入
python SyncDatabaseObject.py -s_h="127.0.0.1" -s_i="sql2017" -s_P=49744 -s_d=DB01 -t_h=127.0.0.1 -t_i="sql2017" -t_P=49744 -t_d="DB02" -obj_type="sp" -obj="[dbo].[sp_test01],[dbo].[sp_test02]" -f="Y"
說明:測試要同步的存儲過程之一爲[dbo].[sp_test01],它依賴於其餘兩個對象:dbo.table01和dbo.fn_test01()
create proc [dbo].[sp_test01] as begin set no count on; delete from dbo.table01 where id = 1000 select dbo.fn_test01() end
而dbo.fn_test01()的以下,依賴於另一個對象:dbo.table02
create function [dbo].[fn_test01] ( ) RETURNS int AS BEGIN declare @count int = 0 select @count = count(1) from dbo.table02 return @count END
所以,這個測試的[dbo].[sp_test01]就依賴於其餘對象,若是其依賴的對象不存在,同步的時候,僅僅同步這個存儲過程自己,是沒有意義的
同步某一個對象的依賴對象,使用以下SQL查出來對象依賴信息,所以這裏就層層深刻,同步依賴對象。
這裏就相似於同步A的時候,A依賴於B和C,而後中止同步A,先同步B和C,同步B或者C的時候,可能又依賴於其餘對象,而後繼續先同步其依賴對象。
效果以下
若是輸入obj_type="sp" 且-obj=爲None的狀況下,會同步源數據庫中的全部存儲過程以及其依賴對象
一樣,若是非強制覆蓋模式同步sp,會給出較多的警告信息,若是同步的對象存在,則給出警告,這裏同時會給出依賴對象存在的警告信息,顯得不太合理,後面會優化。
已知的問題:
1,加密的存儲過程或者函數是沒法實現同步的,由於沒法生成建立對象的腳本
1,table type的同步也是一個蛋疼的過程,目前支持,可是支持的並很差,緣由是建立table type以前,先刪除依賴於table type的對象,不然沒法刪除與建立。
特別說明
依賴對象的解決,仍是比較蛋疼的
若是在默認schema爲dbo的對象,在存儲過程或者函數中沒有寫schema(參考以下修改後的sp,不寫相關表的schema dbo,dbo.test01==>test01),
使用 sys.dm_sql_referenced_entities這個系統函數是沒法找到其依賴的對象的,奇葩的是能夠找到schema的類型,卻沒有返回對象自己。
這一點致使在代碼中層層深刻,進行了長時間的debug,徹底沒有想到這個函數是這個鳥樣子,由於這裏找到依賴對象的類型,卻找不到對象自己,次奧!!!
另一種狀況就是動態SQL了,沒法使用 sys.dm_sql_referenced_entities這個系統函數找到其依賴的對象。
其餘對象的同步
支持其餘數據庫對象的同步,好比function,table type等,由於能夠在同步其餘存儲過程對象的時候附帶的同步function,table type,這個與表或者存儲過程相似,不作過多說明。
已知問題:
1,201906122030:經測試,目前暫時不支持Sequence對象的同步。
須要改進的地方
1,代碼結構優化,更加清晰和條例的結構(一開始用最直接簡單粗暴的方式快速實現,後面重構了不少代碼,如今本身看起來還有不少不舒服的痕跡)
2,數據同步的效率問題,對於多表的導入導出操做,依賴於單線程,多個大表導出串行的話,可能存在效率上的瓶頸,如何根據表的數據量,儘量平均地分配多多個線程中,提高效率
3,更加友好清晰的異常提示以及日誌記錄,生成導出日誌信息。
4,異構數據同步,MySQL《==》SQL Server《==》Oracle《==》PGSQL
代碼端午節寫好了,這幾天抽空進行了一些測試以及bug fix,應該還潛在很多未知的bug,工做量比想象中的大的多了去了。
# -*- coding: utf-8 -*- # !/usr/bin/env python3 __author__ = 'MSSQL123' __date__ = '2019-06-07 09:36' ''' 20190706: 1,partial code refactoring 2,optimize insertion efficiency,insert into table_name values(***),(***)==>insert into table_name values (***);insert into table_name values (***) ''' import os import sys import time import datetime import pymssql from decimal import Decimal usage = ''' -----parameter explain----- source database parameter -s_h : soure database host ----- must require parameter -s_i : soure database instace name ----- default instance name MSSQL -s_d : soure database name ----- must require parameter -s_u : soure database login ----- default windows identifier -s_p : soure database login password ----- must require when s_u is not null -s_P : soure database instance port ----- default port 1433 target database parameter -t_h : target database host ----- must require parameter -t_i : target database instace name ----- default instance name MSSQL -t_d : target database name ----- must require parameter -t_u : target database login ----- default windows identifier -t_p : target database login password ----- must require when s_u is not null -t_P : target database instance port ----- default port 1433 sync object parameter -obj_type : table or sp or function or other databse object ----- tab or sp or fn or tp -obj : table|sp|function|type name ----- whick table or sp sync overwirte parameter -f : force overwirte target database object ----- F or N --help: help document Example: python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="tab" -obj="dbo.t1,dbo.t2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y" python DataTransfer.py -s_h=127.0.0.1 -s_P=1433 -s_i="MSSQL" -s_d="DB01" -obj_type="sp" -obj="dbo.sp1,dbo.sp2" -t_h=127.0.0.1 -t_P=1433 -t_i="MSSQL" -t_d="DB02" -f="Y" ''' class SyncDatabaseObject(object): # source databse s_h = None s_i = None s_P = None s_u = None s_p = None s_d = None # obj type s_obj_type = None # sync objects s_obj = None # target database t_h = None t_i = None t_P = None t_u = None t_p = None t_d = None f = None file_path = None def __init__(self, *args, **kwargs): for k, v in kwargs.items(): setattr(self, k, v) ''' this is supposed to be a valid object name just like xxx_name,or dbo.xxx_name,or [schema].xxx_name or schema.[xxx_name] then transfer this kind of valid object name to format object name like [dbo].[xxx_name](give a default dbo schema name when no schema name) other format object name consider as unvalid,will be rasie error in process format object name 1,xxx_name ======> [dbo].[xxx_name] 2,dbo.xxx_name ======> [dbo].[xxx_name] 3,[schema].xxx_name ======> [dbo].[xxx_name] 3,schema.xxx_name ======> [schema].[xxx_name] 4,[schema].[xxx_name] ======> [schema].[xxx_name] 5,[schema].[xxx_name ======> rasie error format message ''' @staticmethod def format_object_name(name): format_name = "" if ("." in name): schema_name = name[0:name.find(".")] object_name = name[name.find(".") + 1:] if not ("[" in schema_name): schema_name = "[" + schema_name + "]" if not ("[" in object_name): object_name = "[" + object_name + "]" format_name = schema_name + "." + object_name else: if ("[" in name): format_name = "[dbo]." + name else: format_name = "[dbo]." + "[" + name + "]" return format_name # connect to sqlserver def get_connect(self, _h, _i, _P, _u, _p, _d): cursor = False try: if (_u) and (_p): conn = pymssql.connect(host=_h, server=_i, port=_P, user=_u, password=_p, database=_d) else: conn = pymssql.connect(host=_h, server=_i, port=_P, database=_d) if (conn): return conn except: raise return conn # check connection def validated_connect(self, _h, _i, _P, _u, _p, _d): if not (self.get_connect(_h, _i, _P, _u, _p, _d)): print("connect to " + str(_h) + " failed,please check you parameter") exit(0) ''' check user input object is a valid object ''' def exits_object(self, conn, name): conn = conn cursor_source = conn.cursor() # get object by name from source db sql_script = r'''select top 1 1 from ( select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.objects union all select concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) as obj_name from sys.types )t where obj_name = '{0}' '''.format(self.format_object_name(name)) cursor_source.execute(sql_script) result = cursor_source.fetchall() if not result: return 0 else: return 1 conn.cursor.close() conn.close() # table variable sync def sync_table_variable(self, tab_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(tab_name))) > 0: pass else: print("----------------------- warning message -----------------------") print("--------warning: object " + tab_name + " not existing in source database ------------") print("----------------------- warning message -----------------------") print() return exists_in_target = 0 sql_script = r'''select top 1 1 from sys.table_types tp where is_user_defined = 1 and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}' ''' \ .format((self.format_object_name(tab_name))) # if the table schema exists in target server,skip cursor_target.execute(sql_script) exists_in_target = cursor_target.fetchone() # weather exists in target server database if (self.f == "Y"): if (is_reference != "Y"): # skiped,table type can not drop when used by sp sql_script = r''' if OBJECT_ID('{0}') is not null drop type {0} '''.format(self.format_object_name(tab_name)) cursor_target.execute(sql_script) conn_target.commit() else: if exists_in_target: print("----------------------- warning message -----------------------") print("the target table type " + tab_name + " exists ,skiped sync table type from source") print("----------------------- warning message -----------------------") print() return sql_script = r''' DECLARE @SQL NVARCHAR(MAX) = '' SELECT @SQL = 'CREATE TYPE ' + '{0}' + 'AS TABLE' + CHAR(13) + '(' + CHAR(13) + STUFF(( SELECT CHAR(13) + ' , [' + c.name + '] ' + CASE WHEN c.is_computed = 1 THEN 'AS ' + OBJECT_DEFINITION(c.[object_id], c.column_id) ELSE CASE WHEN c.system_type_id != c.user_type_id THEN '[' + SCHEMA_NAME(tp.[schema_id]) + '].[' + tp.name + ']' ELSE '[' + UPPER(y.name) + ']' END + CASE WHEN y.name IN ('varchar', 'char', 'varbinary', 'binary') THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS VARCHAR(5)) END + ')' WHEN y.name IN ('nvarchar', 'nchar') THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS VARCHAR(5)) END + ')' WHEN y.name IN ('datetime2', 'time2', 'datetimeoffset') THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')' WHEN y.name = 'decimal' THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')' ELSE '' END + CASE WHEN c.collation_name IS NOT NULL AND c.system_type_id = c.user_type_id THEN ' COLLATE ' + c.collation_name ELSE '' END + CASE WHEN c.is_nullable = 1 THEN ' NULL' ELSE ' NOT NULL' END + CASE WHEN c.default_object_id != 0 THEN ' CONSTRAINT [' + OBJECT_NAME(c.default_object_id) + ']' + ' DEFAULT ' + OBJECT_DEFINITION(c.default_object_id) ELSE '' END END From sys.table_types tp Inner join sys.columns c on c.object_id = tp.type_table_object_id Inner join sys.types y ON y.system_type_id = c.system_type_id WHERE tp.is_user_defined = 1 and y.name<>'sysname' and concat(QUOTENAME(schema_name(tp.schema_id)),'.',QUOTENAME(tp.name)) = '{0}' ORDER BY c.column_id FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 7, ' ') + ');' select @SQL as script '''.format(self.format_object_name(self.format_object_name((tab_name)))) cursor_target = conn_target.cursor() cursor_source.execute(sql_script) row = cursor_source.fetchone() try: if not exists_in_target: # execute the script on target server cursor_target.execute(str(row[0])) # drop current stored_procudre if exists conn_target.commit() print("*************table type " + self.format_object_name(tab_name) + " synced *********************") print() # give a blank row when finish except: print("----------------------- error message -----------------------") print("-----------table type " + self.format_object_name(tab_name) + " synced error ---------------") print("----------------------- error message -----------------------") print() # raise cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def get_table_column(self, conn, tab_name): column_names = "" conn = conn cursor_source = conn.cursor() # get object by name from source db sql_script = r'''select name from sys.columns where object_id = object_id('{0}') and is_computed=0 order by object_id '''.format(self.format_object_name(tab_name)) cursor_source.execute(sql_script) result = cursor_source.fetchall() for row in result: column_names = column_names + row[0] + "," return column_names[0:len(column_names) - 1] conn.cursor.close() conn.close() # schema sync def sync_schema(self): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_schema = [] # get all table in database when not define table name schema_result = cursor_source.execute(r''' select name from sys.schemas where schema_id>4 and schema_id<16384 ''') for row in cursor_source.fetchall(): cursor_target.execute(r''' if not exists(select * from sys.schemas where name = '{0}') begin exec('create schema [{0}]') end '''.format(str(row[0]))) conn_target.commit() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_table_schema_byname(self, tab_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(tab_name)) == 0): print("----------------------- warning message -----------------------") print("---------------warning: object " + tab_name + " not existing in source database ----------------") print("----------------------- warning message -----------------------") print() return # if exists a reference table for sp,not sync the table agagin if (self.exits_object(conn_target, self.format_object_name(tab_name)) > 0): if (self.f != "Y"): print("----------------------- warning message -----------------------") print("---------------warning: object " + tab_name + " existing in target database ----------------") print("----------------------- warning message -----------------------") print() return sql_script = r''' select top 1 1 from sys.tables where type_desc = 'USER_TABLE' and concat(QUOTENAME(schema_name(schema_id)),'.',QUOTENAME(name)) = '{0}' '''.format((self.format_object_name(tab_name))) # if the table schema exists in target server,skip cursor_target.execute(sql_script) exists_in_target = cursor_target.fetchone() if exists_in_target: if (self.f == "Y"): if (is_reference != "Y"): cursor_target.execute("drop table {0}".format(tab_name)) else: print("----------------------- warning message -----------------------") print("the target table " + tab_name + " exists ,skiped sync table schema from source") print("----------------------- warning message -----------------------") print() return sql_script = r''' DECLARE @object_name SYSNAME , @object_id INT SELECT @object_name = '[' + s.name + '].[' + o.name + ']' , @object_id = o.[object_id] FROM sys.objects o WITH (NOWAIT) JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id] WHERE QUOTENAME(s.name) + '.' + QUOTENAME(o.name) = '{0}' AND o.[type] = 'U' AND o.is_ms_shipped = 0 DECLARE @SQL NVARCHAR(MAX) = '' ;WITH index_column AS ( SELECT ic.[object_id] , ic.index_id , ic.is_descending_key , ic.is_included_column , c.name FROM sys.index_columns ic WITH (NOWAIT) JOIN sys.columns c WITH (NOWAIT) ON ic.[object_id] = c.[object_id] AND ic.column_id = c.column_id WHERE ic.[object_id] = @object_id ), fk_columns AS ( SELECT k.constraint_object_id , cname = c.name , rcname = rc.name FROM sys.foreign_key_columns k WITH (NOWAIT) JOIN sys.columns rc WITH (NOWAIT) ON rc.[object_id] = k.referenced_object_id AND rc.column_id = k.referenced_column_id JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = k.parent_object_id AND c.column_id = k.parent_column_id WHERE k.parent_object_id = @object_id ) SELECT @SQL = 'CREATE TABLE ' + @object_name + '' + '(' + '' + STUFF(( SELECT '' + ', [' + c.name + '] ' + CASE WHEN c.is_computed = 1 THEN 'AS ' + cc.[definition] ELSE UPPER(tp.name) + CASE WHEN tp.name IN ('varchar', 'char', 'varbinary', 'binary', 'text') THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length AS VARCHAR(5)) END + ')' WHEN tp.name IN ('nvarchar', 'nchar') THEN '(' + CASE WHEN c.max_length = -1 THEN 'MAX' ELSE CAST(c.max_length / 2 AS VARCHAR(5)) END + ')' WHEN tp.name IN ('datetime2', 'time2', 'datetimeoffset') THEN '(' + CAST(c.scale AS VARCHAR(5)) + ')' WHEN tp.name = 'decimal' THEN '(' + CAST(c.[precision] AS VARCHAR(5)) + ',' + CAST(c.scale AS VARCHAR(5)) + ')' ELSE '' END + CASE WHEN c.collation_name IS NOT NULL THEN ' COLLATE ' + c.collation_name ELSE '' END + CASE WHEN c.is_nullable = 1 THEN ' NULL' ELSE ' NOT NULL' END + CASE WHEN dc.[definition] IS NOT NULL THEN ' DEFAULT' + dc.[definition] ELSE '' END + CASE WHEN ic.is_identity = 1 THEN ' IDENTITY(' + CAST(ISNULL( /*ic.seed_value*/ 1, '0') AS CHAR(1)) + ',' + CAST(ISNULL(ic.increment_value, '1') AS CHAR(1)) + ')' ELSE '' END END + '' FROM sys.columns c WITH (NOWAIT) JOIN sys.types tp WITH (NOWAIT) ON c.user_type_id = tp.user_type_id LEFT JOIN sys.computed_columns cc WITH (NOWAIT) ON c.[object_id] = cc.[object_id] AND c.column_id = cc.column_id LEFT JOIN sys.default_constraints dc WITH (NOWAIT) ON c.default_object_id != 0 AND c.[object_id] = dc.parent_object_id AND c.column_id = dc.parent_column_id LEFT JOIN sys.identity_columns ic WITH (NOWAIT) ON c.is_identity = 1 AND c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id WHERE c.[object_id] = @object_id ORDER BY c.column_id FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '' + ' ') + ISNULL((SELECT '' + ', CONSTRAINT [' + k.name + '] PRIMARY KEY (' + (SELECT STUFF(( SELECT ', [' + c.name + '] ' + CASE WHEN ic.is_descending_key = 1 THEN 'DESC' ELSE 'ASC' END FROM sys.index_columns ic WITH (NOWAIT) JOIN sys.columns c WITH (NOWAIT) ON c.[object_id] = ic.[object_id] AND c.column_id = ic.column_id WHERE ic.is_included_column = 0 AND ic.[object_id] = k.parent_object_id AND ic.index_id = k.unique_index_id FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '')) + ')' + '' FROM sys.key_constraints k WITH (NOWAIT) WHERE k.parent_object_id = @object_id AND k.[type] = 'PK'), '') + ')' + '' + ISNULL((SELECT ( SELECT '' + 'ALTER TABLE ' + @object_name + ' WITH' + CASE WHEN fk.is_not_trusted = 1 THEN ' NOCHECK' ELSE ' CHECK' END + ' ADD CONSTRAINT [' + fk.name + '] FOREIGN KEY(' + STUFF(( SELECT ', [' + k.cname + ']' FROM fk_columns k WHERE k.constraint_object_id = fk.[object_id] and 1=2 FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')' + ' REFERENCES [' + SCHEMA_NAME(ro.[schema_id]) + '].[' + ro.name + '] (' + STUFF(( SELECT ', [' + k.rcname + ']' FROM fk_columns k WHERE k.constraint_object_id = fk.[object_id] FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')' + CASE WHEN fk.delete_referential_action = 1 THEN ' ON DELETE CASCADE' WHEN fk.delete_referential_action = 2 THEN ' ON DELETE SET NULL' WHEN fk.delete_referential_action = 3 THEN ' ON DELETE SET DEFAULT' ELSE '' END + CASE WHEN fk.update_referential_action = 1 THEN ' ON UPDATE CASCADE' WHEN fk.update_referential_action = 2 THEN ' ON UPDATE SET NULL' WHEN fk.update_referential_action = 3 THEN ' ON UPDATE SET DEFAULT' ELSE '' END + '' + 'ALTER TABLE ' + @object_name + ' CHECK CONSTRAINT [' + fk.name + ']' + '' FROM sys.foreign_keys fk WITH (NOWAIT) JOIN sys.objects ro WITH (NOWAIT) ON ro.[object_id] = fk.referenced_object_id WHERE fk.parent_object_id = @object_id FOR XML PATH(N''), TYPE).value('.', 'NVARCHAR(MAX)')), '') + ISNULL(((SELECT '' + 'CREATE' + CASE WHEN i.is_unique = 1 THEN ' UNIQUE' ELSE '' END + ' NONCLUSTERED INDEX [' + i.name + '] ON ' + @object_name + ' (' + STUFF(( SELECT ', [' + c.name + ']' + CASE WHEN c.is_descending_key = 1 THEN ' DESC' ELSE ' ASC' END FROM index_column c WHERE c.is_included_column = 0 AND c.index_id = i.index_id FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')' + ISNULL('' + 'INCLUDE (' + STUFF(( SELECT ', [' + c.name + ']' FROM index_column c WHERE c.is_included_column = 1 AND c.index_id = i.index_id FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 2, '') + ')', '') + '' FROM sys.indexes i WITH (NOWAIT) WHERE i.[object_id] = @object_id AND i.is_primary_key = 0 AND i.[type] = 2 FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)') ), '') select @SQL as script '''.format(self.format_object_name(tab_name)) cursor_target = conn_target.cursor() cursor_source.execute(sql_script) row = cursor_source.fetchone() if not row[0]: return try: cursor_target.execute(row[0]) # drop current table schema if exists conn_target.commit() print("*************schema " + self.format_object_name(tab_name) + " synced *************") print() # give a blank row when finish except: print("----------------------- warning message -----------------------") print("-----------schema " + self.format_object_name(tab_name) + " synced failed---------------") print("----------------------- warning message -----------------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_table_schema(self): #default not sync by referenced other object is_reference = "N" conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_table = [] if (self.s_obj): for tab_name in self.s_obj.split(","): if (tab_name) and (self.exits_object(conn_source, tab_name)>0): self.sync_table_schema_byname(tab_name, is_reference) else: print("----------------------- warning message -----------------------") print("-----------schema " + self.format_object_name(tab_name) + " not existing in source database---------------") print("----------------------- warning message -----------------------") print() continue else: # sync all tables # get all table in database when not define table name sql_script = ''' SELECT QUOTENAME(s.name)+'.'+ QUOTENAME(o.name) FROM sys.objects o WITH (NOWAIT) JOIN sys.schemas s WITH (NOWAIT) ON o.[schema_id] = s.[schema_id] WHERE o.[type] = 'U' AND o.is_ms_shipped = 0 ''' cursor_source.execute(sql_script) for row in cursor_source.fetchall(): self.sync_table_schema_byname(str(row[0]), is_reference) # sync data from soure table to target table def sync_table_data_byname_bak(self,tab_name): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() insert_columns = "" insert_columns = self.get_table_column(conn_source, tab_name) insert_prefix = "" # weather has identity column cursor_source.execute(r'''select 1 from sys.columns where object_id = OBJECT_ID('{0}') and is_identity =1 '''.format(tab_name)) exists_identity = None exists_identity = cursor_source.fetchone() if (exists_identity): insert_prefix = "set identity_insert {0} on; ".format(tab_name) # data source insert_sql = "" values_sql = "" current_row = "" counter = 0 sql_script = r''' select top 288888 {0} from {1} '''.format(insert_columns, tab_name) cursor_source.execute(sql_script) # create insert columns ''' for field in cursor_source.description: insert_columns = insert_columns + str(field[0]) + "," insert_columns = insert_columns[0:len(insert_columns) - 1] ''' insert_prefix = insert_prefix + "insert into {0} ({1}) values ".format(tab_name, insert_columns) for row in cursor_source.fetchall(): counter = counter + 1 for key in row: if (str(key) == "None"): current_row = current_row + r''' null, ''' else: if (type(key) is datetime.datetime): current_row = current_row + r''' '{0}', '''.format(str(key)[0:23]) elif (type(key) is str): # 我槽!!!,這裏又有一個坑:https://blog.csdn.net/dadaowuque/article/details/81016127 current_row = current_row + r''' '{0}', '''.format( key.replace("'", "''").replace('\u0000', '').replace('\x00', '')) elif (type(key) is Decimal): d = Decimal(key) s = '{0:f}'.format(d) current_row = current_row + r''' '{0}', '''.format(s) elif (type(key) is bytes): # print(hex(int.from_bytes(key, 'big', signed=True) )) current_row = current_row + r''' {0}, '''.format(hex(int.from_bytes(key, 'big', signed=False))) else: current_row = current_row + r''' '{0}', '''.format(key) current_row = current_row[0:len(current_row) - 2] # remove the the last one char "," values_sql = values_sql + "(" + current_row + ")," current_row = "" # execute the one batch when if (counter == 1000): insert_sql = insert_prefix + values_sql insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char "," if (exists_identity): insert_sql = insert_sql + " ;set identity_insert {0} off;".format(tab_name) try: cursor_target.execute(insert_sql) except: print( "----------------------error " + tab_name + " data synced failed-------------------------") raise conn_target.commit() insert_sql = "" values_sql = "" current_row = "" counter = 0 print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "*************** " + self.format_object_name(tab_name) + " " + str(1000) + " rows synced *************") #sync last remainder batch data if (values_sql): insert_sql = insert_prefix + values_sql insert_sql = insert_sql[0:len(insert_sql) - 1] # remove the the last one char "," if (exists_identity): insert_sql = insert_sql + " ; set identity_insert {0} off;".format(tab_name) # execute the last batch try: cursor_target.execute(insert_sql) except: print("------------------error " + tab_name + " data synced failed------------------------") raise conn_target.commit() insert_sql = "" values_sql = "" current_row = "" print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "*************** " + self.format_object_name( tab_name) + " " + str( counter) + " rows synced *************") print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "----------------synced " + self.format_object_name( tab_name) + " data finished---------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() # sync data from soure table to target table def sync_table_data_byname(self,tab_name): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() insert_columns = self.get_table_column(conn_source, tab_name) if (self.f != "Y"): sql_script = "select top 1 {0} from {1} ".format(insert_columns, tab_name) # if exists data in target table,break cursor_target.execute(sql_script) exists = cursor_target.fetchone() if exists: print("----------------------- warning message -----------------------") print("the target table " + tab_name + " exists data,skiped sync data from source") print("----------------------- warning message -----------------------") print() return else: sql_script = "truncate table {0} ".format(tab_name) # if exists data in target table,break cursor_target.execute(sql_script) conn_target.commit() # weather has identity column cursor_source.execute(r'''select 1 from sys.columns where object_id = OBJECT_ID('{0}') and is_identity =1 '''.format(tab_name)) exists_identity = None exists_identity = cursor_source.fetchone() if (exists_identity): cursor_target.execute(" set identity_insert {0} on;".format(tab_name)) conn_target.commit() # data source insert_sql = "" counter = 0 sql_script = r''' select {0} from {1} '''.format(insert_columns, tab_name) cursor_source.execute(sql_script) # create insert columns ''' for field in cursor_source.description: insert_columns = insert_columns + str(field[0]) + "," insert_columns = insert_columns[0:len(insert_columns) - 1] ''' current_row = "insert into {0} ({1}) values ( ".format(tab_name, insert_columns) for row in cursor_source.fetchall(): counter = counter + 1 for key in row: if (str(key) == "None"): current_row += r''' null, ''' else: if (type(key) is datetime.datetime): current_row += r''' '{0}', '''.format(str(key)[0:23]) elif (type(key) is str): # 我槽!!!,這裏又有一個坑:https://blog.csdn.net/dadaowuque/article/details/81016127 current_row += r''' '{0}', '''.format(key.replace("'", "''").replace('\u0000', '').replace('\x00', '')) elif (type(key) is Decimal): d = Decimal(key) s = '{0:f}'.format(d) current_row += r''' '{0}', '''.format(s) elif (type(key) is bytes): # print(hex(int.from_bytes(key, 'big', signed=True) )) current_row += r''' {0}, '''.format(hex(int.from_bytes(key, 'big', signed=False))) else: current_row += r''' '{0}', '''.format(key) current_row = current_row[0:len(current_row) - 2] # remove the the last one char "," current_row += ");" insert_sql += current_row current_row = "insert into {0} ({1}) values ( ".format(tab_name, insert_columns) # execute the one batch when if (counter == 3000): try: cursor_target.execute(insert_sql) conn_target.commit() except: print( "----------------------error " + tab_name + " data synced failed-------------------------") raise insert_sql = "" current_row = "insert into {0} ({1}) values ( ".format(tab_name, insert_columns) counter = 0 print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "*************** " + self.format_object_name(tab_name) + " " + str(3000) + " rows synced *************") #sync last remainder batch data if (counter>0): # execute the last batch try: cursor_target.execute(insert_sql) conn_target.commit() except: print("------------------error " + tab_name + " data synced failed------------------------") raise insert_sql = "" current_row = "" print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "*************** " + self.format_object_name(tab_name) + " " + str(counter) + " rows synced *************") print(time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) + "----------------synced " + self.format_object_name(tab_name) + " data finished---------------") print() if (exists_identity): cursor_target.execute(" ; set identity_insert {0} off;".format(tab_name)) conn_target.commit() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() # sync data from soure table to target table def sync_table_data(self): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() arr_table = [] if (self.s_obj): arr_table = self.s_obj.split(',') for tab_name in arr_table: self.sync_table_data_byname(self.format_object_name(tab_name)) def sync_dependent_object(self, obj_name): # 強制覆蓋,不須要對依賴對象生效,若是是由於屬於依賴對象而被同步的,先檢查target中是否存在,若是存在就不繼續同步,這裏打一個標記來實現 is_refernece = "Y" conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() ''' find dependent objects if exists dependent objects,sync Dependent objects objects in advance ''' sql_check_dependent = r''' SELECT * FROM ( SELECT distinct rtrim(lower(s.type)) COLLATE Chinese_PRC_CI_AS as obj_type, QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d inner join sys.sysobjects s on s.id = d.referenced_id union all SELECT distinct rtrim(lower(d.referenced_class_desc)) COLLATE Chinese_PRC_CI_AS as obj_type, QUOTENAME(d.referenced_schema_name)+'.'+QUOTENAME(d.referenced_entity_name) COLLATE Chinese_PRC_CI_AS as obj FROM sys.dm_sql_referenced_entities('{0}','OBJECT') as d inner join sys.types s on s.user_type_id = d.referenced_id )t '''.format(self.format_object_name(obj_name)) cursor_source.execute(sql_check_dependent) result = cursor_source.fetchall() for row in result: if row[1]: if (row[0] == "u"): if (row[1]): self.sync_table_schema_byname(row[1], is_refernece) elif (row[0] == "fn" or row[0] == "if"): if (row[1]): self.sync_procudre_byname("f", row[1], is_refernece) elif (row[0] == "type"): if (row[1]): self.sync_table_variable(row[1], is_refernece) def sync_procudre_byname(self, type, obj_name, is_reference): conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.exits_object(conn_source, self.format_object_name(obj_name)) == 0): print("---------------warning message----------------") print("---------------warning: object " + obj_name + " not existing in source database ----------------") print("---------------warning message----------------") print() return elif (self.exits_object(conn_target, self.format_object_name(obj_name)) > 0): if (self.f != "Y"): print("---------------warning message----------------") print("---------------warning: object " + obj_name + " existing in target database ----------------") print("---------------warning message----------------") print() return ''' 原本想直接生成刪除語句的: 這裏有一個該死的轉義,怎麼都弄很差,中午先去吃飯吧, 下午回來想了一下,換一種方式,不要死磕轉義問題了 sql_script = select 'if object_id('+''''+QUOTENAME(schema_name(uid))+ '' + QUOTENAME(name)+''''+') is not null ' +' drop proc '+QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) , OBJECT_DEFINITION(id) from sys.sysobjects where xtype = 'P' and uid not in (16,19) ''' sql_script = r''' select QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name), OBJECT_DEFINITION(id) from sys.sysobjects where xtype in ('P','IF','FN') and uid not in (16,19) ''' if (obj_name): sql_script = sql_script + " and QUOTENAME(schema_name(uid))+ '.' + QUOTENAME(name) ='{0}' ".format( self.format_object_name(obj_name)) cursor_source.execute(sql_script) row = cursor_source.fetchone() try: if type == "f": sql_script = r''' if object_id('{0}') is not null drop function {0} '''.format(self.format_object_name(row[0])) elif type == "p": sql_script = r''' if object_id('{0}') is not null drop proc {0} '''.format(self.format_object_name(row[0])) cursor_target.execute(sql_script) # drop current stored_procudre if exists conn_target.commit() # sync dependent object if (is_reference != "N"): self.sync_dependent_object(self.format_object_name(row[0])) # sync object it self cursor_target.execute(str(row[1])) # execute create stored_procudre script conn_target.commit() print("*************sync sp: " + self.format_object_name(row[0]) + " finished *****************") print() except: print("---------------error message----------------") print("------------------ sync " + row[0] + "sp error --------------------------") print("---------------error message----------------") print() cursor_source.close() conn_source.close() cursor_target.close() conn_target.close() def sync_procudre(self, type): is_reference = "N" conn_source = self.get_connect(self.s_h, self.s_i, self.s_P, self.s_u, self.s_p, self.s_d) conn_target = self.get_connect(self.t_h, self.t_i, self.t_P, self.t_u, self.t_p, self.t_d) cursor_source = conn_source.cursor() cursor_target = conn_target.cursor() if (self.s_obj): for proc_name in self.s_obj.split(","): self.sync_dependent_object(proc_name) self.sync_procudre_byname(type, proc_name, is_reference) # sync all sp and function else: sql_script = r''' select QUOTENAME(schema_name(uid))+'.'+QUOTENAME(name), OBJECT_DEFINITION(id) from sys.sysobjects where xtype = upper('{0}') and uid not in (16,19) '''.format(type) cursor_source.execute(sql_script) for row in cursor_source.fetchall(): self.sync_dependent_object(row[0]) self.sync_procudre_by_name(type, row[0], is_reference) if __name__ == "__main__": ''' sync = SyncDatabaseObject(s_h="127.0.0.1", s_i = "sql2017", s_P = 49744, s_d="DB01", t_h="127.0.0.1", t_i="sql2017", t_P=49744, t_d="DB02", s_obj_type = "sp", s_obj = "dbo.sp_test01", f="Y") sync.sync_procudre("p") ''' p_s_h = "" p_s_i = "MSSQL" p_s_P = 1433 p_s_d = "" p_s_u = None p_s_p = None p_s_obj = "" p_type = "" p_t_s = "" p_t_i = "MSSQL" p_t_P = "1433" p_t_d = "" p_t_u = None p_t_p = None # force conver target database object,default not force cover target database object p_f = "N" # sync obj type table|sp p_obj_type = None # sync whick database object p_obj = None if len(sys.argv) == 1: print(usage) sys.exit(1) elif sys.argv[1] == '--help': print(usage) sys.exit() elif len(sys.argv) >= 2: for i in sys.argv[1:]: _argv = i.split('=') # source server name if _argv[0] == '-s_h': p_s_h = _argv[1] # source server instance name if _argv[0] == '-s_i': if (_argv[1]): p_s_i = _argv[1] # source server instance PORT if _argv[0] == '-s_P': if (_argv[1]): p_s_P = _argv[1] # source database name if _argv[0] == '-s_d': p_s_d = _argv[1] if _argv[0] == '-s_u': p_s_u = _argv[1] if _argv[0] == '-s_p': p_s_p = _argv[1] if _argv[0] == '-t_h': p_t_h = _argv[1] if _argv[0] == '-t_i': if (_argv[1]): p_t_i = _argv[1] if _argv[0] == '-t_P': if (_argv[1]): p_t_P = _argv[1] if _argv[0] == '-t_d': p_t_d = _argv[1] if _argv[0] == '-t_u': p_t_u = _argv[1] if _argv[0] == '-t_p': p_t_p = _argv[1] if _argv[0] == '-f': if (_argv[1]): p_f = _argv[1] # object type if _argv[0] == '-obj_type': if not (_argv[1]): print("-obj_type can not be null (-obj=tab|-obj=sp|-obj=fn|-obj=type)") exit(0) else: p_obj_type = _argv[1] # object name if _argv[0] == '-obj': if (_argv[1]): p_obj = _argv[1] # require para if p_s_h.strip() == "": print("source server host cannot be null") exit(0) if p_s_d.strip() == "": print("source server host database name cannot be null") exit(0) if p_t_h.strip() == "": print("target server host cannot be null") exit(0) if p_t_d.strip() == "": print("target server host database name cannot be null") exit(0) sync = SyncDatabaseObject(s_h=p_s_h, s_i=p_s_i, s_P=p_s_P, s_d=p_s_d, s_u=p_s_u, s_p=p_s_p, s_obj=p_obj, t_h=p_t_h, t_i=p_t_i, t_P=p_t_P, t_d=p_t_d, t_u=p_t_u, t_p=p_t_p, f=p_f) sync.validated_connect(p_s_h, p_s_i, p_s_P, p_s_d, p_s_u, p_s_p) sync.validated_connect(p_t_h, p_t_i, p_t_P, p_t_d, p_t_u, p_t_p) if (p_f.upper() == "Y"): confirm = input("confirm you want to overwrite the target object? ") if confirm.upper() != "Y": exit(0) print("-------------------------- sync begin ----------------------------------") print() if (p_obj_type == "tab"): # sync schema sync.sync_schema() # sync table schema sync.sync_table_schema() # sync data sync.sync_table_data() elif (p_obj_type == "sp"): # sync schema sync.sync_schema() # sync sp sync.sync_procudre("p") elif (p_obj_type == "fn"): # sync schema sync.sync_schema() # sync sp sync.sync_procudre("fn") elif (p_obj_type == "tp"): # sync schema sync.sync_schema() # sync sp sync.sync_table_variable() else: print("-obj_type is not validated") print() print("-------------------------- sync finish ----------------------------------")