SSIS結合BCP及SQL Server做業實現定時將數據導出打包實現數據同步

首先這個流程要實現的功能大體是:sql

有兩臺服務器,一臺是對外網開發的,一臺是內網的。那麼很明顯數據交互都是外網服務器在作,而這個流程要作的就是要將外網上面的數據定時同步到內網中。shell

咱們依對其中某張表的操做爲例子,經過在一張基表(Staff)上面創建觸發器,用來監督Staff表的增,刪,改數據庫

由觸發器將修改的內容的主鍵ID,表名稱,以及所執行的動做(增,刪,改),和修改內容的時間。記錄到一張日誌表中(ActionLog)服務器

而後再由做業調用SSIS及BCP將修改的內容記錄到一個txt文件中,隨後對txt文件進行打包。編輯器

這樣當其餘的程序或者做業再去調用這個打包後的文件,並把它進行處理,就能夠實現數據的同步工做了。ide

首先要新建一個SSIS項目,經過"文件 - 新建 - 項目 - Integration Services 項目",輸入名稱和項目位置以後,建立SSIS項目工具

建立完成以後,默認會進入到控制流的選項卡界面,這時候先不要着急走流程,首先建立幾個用於流程的變量,在菜單欄上的"SSIS - 變量"點擊建立spa

(如下變量或變量賦予的默認值,根據本身項目中的實際狀況決定,僅供參考)設計

Flag(存放壓縮包文件路徑,String),代理

OutputPath(壓縮包文件路徑,String),

prefix(壓縮包文件前綴,String,值:ylstf_),

TableName(須要操做的表名稱,String,值:Staff),

TopNum(每次須要操做多少條數據,Int32,值:20),

TxtDataPath(存放TXT文件的路徑,String,值:D:\Data_sync\YlStf\Temp\)

接下來咱們全部的操做都會在SSIS設計中的控制流選項卡下進行操做

首先從左邊「工具欄」的「控制流項」中,拖拽腳本任務到右邊的控制流選項卡下,並將該腳本任務的名字改爲「調用存儲過程BCP出數據」,雙擊腳本進入腳本任務編輯器

在「腳本」選項的ReadOnlyVariables中選擇prefix,TableName,TopNum,TxtDataPath 變量,ReadWriteVariables中選擇Flag,OutputPath變量。

而後點擊「編輯腳本」進入代碼編輯

public void Main()
        {
            DateTime dtime = DateTime.Now;
            SqlConnection con = new SqlConnection("你的數據庫連接地址");
            SqlCommand cmd = new SqlCommand("[Data_SYNC_Stf]", con);

            string dataPackPath = @"D:\Data_sync\Upload\";
            string DataPath = Dts.Variables["TxtDataPath"].Value.ToString();

            cmd.Parameters.Add("@table", SqlDbType.VarChar, 80).Value = Dts.Variables["TableName"].Value;
            cmd.Parameters.Add("@mInitialFilePath", SqlDbType.VarChar, 500).Value = DataPath;
            cmd.Parameters.Add("@mUploadFilePath", SqlDbType.VarChar, 500).Value = dataPackPath;
            cmd.Parameters.Add("@dateflag", SqlDbType.DateTime).Value = dtime;
            cmd.Parameters.Add("@prefix", SqlDbType.VarChar, 20).Value = Dts.Variables["prefix"].Value;
            cmd.Parameters.Add("@TopNum", SqlDbType.Int).Value = 200;
            cmd.Parameters.Add("@DataFullFileName", SqlDbType.VarChar, 500).Value = "";
            cmd.Parameters["@DataFullFileName"].Direction = ParameterDirection.Output;
            cmd.CommandType = CommandType.StoredProcedure;
            cmd.CommandTimeout = 300;
            try
            {
                con.Open();
                cmd.ExecuteNonQuery();
                Dts.Variables["Flag"].Value = cmd.Parameters["@DataFullFileName"].Value.ToString();

                System.IO.FileInfo fileinfo = new System.IO.FileInfo(DataPath + Dts.Variables["TableName"].Value + ".txt");
                if (fileinfo.Length < 1)
                {
                    throw new Exception("Empty files!!!!!!!!!!!!!!!!!!!!!!!");
                }

                Dts.Variables["OutputPath"].Value = " a -ap -df " + Dts.Variables["Flag"].Value + " " + DataPath;
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            catch (Exception ex)
            {
                Dts.Events.FireError(0, "Event Snippet", ex.Message, "", 0);
                Dts.TaskResult = (int)ScriptResults.Failure;
            }
            finally { if (con.State == ConnectionState.Open) { con.Close(); } }


        }
View Code

代碼中的Dts是 Microsoft.SqlServer.Dts.Runtime命名空間下的一個屬性,Dts.Variables["變量名"].value 是獲取SSIS中定義的變量值 Dts.Variables["變量名"].Value =「」 則是爲變量賦值。
若是你不能找到Dts這個屬性,那麼請在引用中引入:Microsoft.SqlServer.ManagedDTS,Microsoft.SqlServer.ScriptTask 這兩個dll文件

上面的代碼中調用了一個存儲過程[Data_SYNC_Stf],咱們的BCP工做也就是在這裏面完成的

Create PROCEDURE [dbo].[Data_SYNC_Stf]
    @table VARCHAR(40) ,
    @mInitialFilePath VARCHAR(500) , --數據包路徑
    @mUploadFilePath VARCHAR(500) , --上傳的壓縮文件路徑
    @DateFlag DATETIME , --打包日期
    @TopNum INT,         --每次多少條
    @prefix varchar(20) ,--文件名前綴
    @DataFullFileName VARCHAR(500) OUTPUT --要上傳的數據包完整路徑

AS
    BEGIN
        
        DECLARE @__error_message NVARCHAR(2048) --錯誤信息
        DECLARE @err INT --錯誤數記錄 
        BEGIN TRY 
        SET @DataFullFileName=@mUploadFilePath + @prefix + replace(replace(replace(convert(varchar,@DateFlag,120),'-',''),' ',''),':','')+'.zip'
    --SELECT  *  FROM      actionlog WITH (HOLDLOCK);
            --檢查ActionLogTempStorage表是否還有沒傳完的。 沒有則新插
            DECLARE @count INT
            SELECT  @count = COUNT(1)
            FROM   dbo.ActionLogTempStorage
            WHERE   objTable = @table --AND state = 0  
                    
            IF @count = 0 
                BEGIN 
                    INSERT  INTO dbo.ActionLogTempStorage
                            ( objID ,
                              objtable ,
                              objaction ,
                              createDate
                            )
                            SELECT TOP 200 objid ,
                                    objtable ,
                                    objaction ,
                                    @DateFlag
                            FROM    dbo.actionlog
                            WHERE   objtable = @table
                            
                    DELETE  FROM dbo.actionlog 
                    WHERE   objID IN (
                            SELECT  objID
                            FROM    dbo.ActionLogTempStorage)
                    
                END
    --SELECT  *  FROM  actionlog WITH (noLOCK);

    --開始導出各表操做======================================= 
            DECLARE @sql VARCHAR(2000)                 
   

    --打包ActionTemp中須要插入的表記錄
           
            SET @sql = 'bcp "SELECT top '''+@TopNum +''' a.* FROM dbo.Staff AS a RIGHT JOIN dbo.ActionLogTempStorage AS b on a.ID=b.[objID] where b.objTable='''+@table+''' and b.state=0 and (b.objAction=1 or b.objAction=2)" queryout ' + @mInitialFilePath + @table+'.txt -c -T -r{_r} -t {_t}'
           
           PRINT @sql 
           
            EXEC master.dbo.xp_cmdshell @sql
    
        END TRY
        BEGIN CATCH
   --錯誤處理 
            SET @err = @@error
            SELECT  @__error_message = ERROR_MESSAGE() 
            INSERT  dbo._SYNCERROR
                    ( MSG )
            VALUES  ( @table + @__error_message )
        END CATCH
    END
View Code

若是你的master.dbo.xp_cmdshell 存儲過程執行的有誤,那麼請在調用這個存儲過程的前面加上:
 --啓用xp_cmdshell 所屬的高級配置項
 EXEC sp_configure 'show advanced options', 1 ;
 RECONFIGURE ;

 --啓用xp_cmdshell
 EXEC sp_configure 'xp_cmdshell', 1 ;
 RECONFIGURE ;

接着在控制流中,拖拽執行進程任務的控制流項,雙擊進入任務編輯器。

在處理中的Executable選擇WinRAR(或其餘壓縮包)程序,WorkingDirectory選擇WinRAR(或其餘壓縮包)程序文件夾

如個人是,Executable:C:\Program Files\WinRAR\WinRAR.exe,WorkingDirectory:C:\Program Files\WinRAR\,設置完成以後點擊肯定

再從左邊的工具欄裏面拖拽「腳本任務」控制流項,名字改成「完成併產生OK標記」(打包完成以後會生成兩個壓縮包,一個是存放數據的壓縮包,另外一個是_ok.zip壓縮包,有這個帶ok的壓縮包文件,說明是成功的打包文件,不然就認爲是錯誤的文件)

一樣雙擊編輯,進入腳本任務編輯器。在ReadOnlyVariables中選擇:Flag,OutputPath,TableName變量,點擊「編輯腳本」

 public void Main()
        {
            DateTime dtime = DateTime.Now;
            SqlConnection con = new SqlConnection("你的數據庫連接地址");
            string TableName = Dts.Variables["TableName"].Value.ToString();
            string Flag = Dts.Variables["Flag"].Value.ToString();
            List<string> sqlList = new List<string>();

            string sql = "INSERT  INTO dbo.ActionTemp (objID,objtable,objAction,createDate,[state],DataFileName) SELECT ";
            sql += "objid ,objtable , objaction , GETDATE() , 1, ";
            sql += "'" + Dts.Variables["Flag"].Value + "'";
            sql += " FROM dbo.ActionLogTempStorage WHERE objTable = '" + TableName + "'";
            sqlList.Add(sql);

            sqlList.Add("DELETE  FROM dbo.ActionLogTempStorage WHERE objTable ='" + TableName + "';");
            try
            {
                con.Open();
                SqlCommand cmd = new SqlCommand();
                cmd.Connection = con;
                cmd.CommandType = CommandType.Text;
                for (int i = 0; i < sqlList.Count; i++)
                {
                    cmd.CommandText = sqlList[i].ToString();
                    if (cmd.ExecuteNonQuery() >= 1)
                    {
                        continue;
                    }
                    else
                    {
                        throw new Exception(" back db error sql str:" + sql[i]);
                    }
                }
                string Dataflag = Flag.Replace(".zip", "_ok.zip");
                System.IO.File.Create(Dataflag).Dispose();
                Dts.TaskResult = (int)ScriptResults.Success;

            }
            catch (Exception ex)
            {
                if (File.Exists(Flag))
                {
                    File.Delete(Flag);
                }
                if (File.Exists(Flag.Replace(".zip", "_ok.zip")))
                {
                    File.Delete(Flag.Replace(".zip", "_ok.zip"));
                }
                Dts.Events.FireError(0, "Err", "LastStep Error:" + ex.Message, "", 0);
                Dts.TaskResult = (int)ScriptResults.Failure;
            }
            finally { if (con.State == ConnectionState.Open) { con.Close(); } }
        }
View Code

到這裏SSIS的開發流程已經完成了。咱們能夠經過剛剛建立項目的路徑找到.dtsx文件放到須要調用的服務器上面

下面開始經過做業調用SSIS:

在做業「屬性」的「步驟」中「新建」一個步驟,

類型選擇「Sql Server Integration Services包」,

用行身份選擇「Sql Server代理服務賬號」,

包源選擇「Sql Server」,

使用Windows身份驗證或使用Sql Server 身份驗證(輸入用戶名和密碼)均可以,

包選擇你把.dtsx放的地方

做業的其餘地方根據服務器的要求本身設定便可。

做業建立好以後,運行一下做業,看看有沒有在指定的路徑下面生成壓縮包文件和_ok壓縮包文件。

以及ActionLog和ActionTemp中的數據是否有變化,就能夠判斷SSIS包有沒有執行成功了!

相關文章
相關標籤/搜索