微軟BI 之SSIS 系列 - 使用 Script Component Destination 和 ADO.NET 解析不規則文件並插入數據

開篇介紹

這一篇文章是 微軟BI 之SSIS 系列 - 帶有 Header 和 Trailer 的不規則的平面文件輸出處理技巧  的續篇,在上篇文章中介紹到了對於這種不規則文件輸出的處理方式。好比下圖中的這種不規則文件,第一行,第二行 Header 部分,第三行的內容 Content 部分,最後一行的 Trailer 部分。html

在前幾個課程 微軟BI SSIS 2012 ETL 控件與案例精講 第43,44,45,46 課中,我分別講解了如何使用 .Script Component Source 解析不規則文件(第43,44課),如何使用 Script Component 同步 Transformation 轉換處理不規則文件(第45課),以及使用異步的 Transformation 轉換不規則文件(第46課),今天咱們講解的是 Script Component Destination。即把 Script Component 做爲一個 Destination 目標來使用,既然是 Destination 組件,那麼就應該是以接受來自上游的 Input 輸入並對數據進行清洗處理。而且,通常的 Destination 要麼就是數據庫表,要麼就是文件,即咱們是借用 Script Component 往表或者文件中插入數據。sql

使用 Script Component Destination

咱們仍是使用以前的不規則文件做爲本篇的例子來說解。這是 微軟BI 之SSIS 系列 - 帶有 Header 和 Trailer 的不規則的平面文件輸出處理技巧 這篇文章中用到的文件格式描述。數據庫

和以前的幾個課程的配置,文件連接等方式同樣,略。添加一個新的 Script Component 並選擇使用 Destination。編程

這裏仍然選擇 EMPLOYEE 做爲 INPUT 輸入項,文件源配置參照前幾課的設置。緩存

在 Script 中訪問 SQL Server 數據庫表,應經過 ADO.NET 連接方式訪問數據庫表。app

連接到課程指定的 DEMO 數據庫。異步

跟 Script Component - Source 中處理同樣,取一個名稱 - CON_ADO。或者先在連接管理器中先建立好一個 ADO.NET 連接管理器,而後再關聯也是能夠的。ide

將上方的文件源關聯起來,注意到下方 ADO.NET 鏈接管理器已經建立了。性能

在 Script 腳本中使用 ADO.NET 編程方式須要引入這個 NameSpace 命名空間。測試

具體代碼部分參照以下,具體講解請參看視頻:

#region Help:  Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services data flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script component. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Data.SqlClient; // 須要引入的 ADO.NET 訪問
#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{ 
    //鏈接管理器接口
    IDTSConnectionManager100 con;

    //數據庫訪問對象 
    SqlConnection sqlConn;
    SqlCommand sqlCommand1;
    SqlCommand sqlCommand2; 
    SqlParameter sqlParameter; 

    // 須要重寫的方法 初始化數據庫鏈接
    public override void AcquireConnections(object Transaction)
    {   
        // Script Component 關聯的 ADO.NET 鏈接管理器
        con = this.Connections.CONADO; 
        sqlConn = (SqlConnection)con.AcquireConnection(null);
    }
    
    // 初始化SQL語句與參數
    public override void PreExecute()
    {
        //INSERT INTO T046_EMPLOYEE_FILE_EXTRACTION VALUES(@FILE_CREATED_DATE,@TOTAL_EMPLOYEES)
        //INSERT INTO T046_EMPLOYEES VALUES(@EMP_NAME,@POSITION,@HIRED_DATE,@BIRTH_DATE,@EMAIL,@PHONE,@MARRIAGE)
        sqlCommand1 = new SqlCommand("INSERT INTO T046_EMPLOYEE_FILE_EXTRACTION VALUES(@FILE_CREATED_DATE,@TOTAL_EMPLOYEES)",sqlConn);
        sqlCommand2 = new SqlCommand("INSERT INTO T046_EMPLOYEES VALUES(@EMP_NAME,@POSITION,@HIRED_DATE,@BIRTH_DATE,@EMAIL,@PHONE,@MARRIAGE)", sqlConn);

        // For Table T046_EMPLOYEE_FILE_EXTRACTION
        sqlParameter = new SqlParameter("@FILE_CREATED_DATE", SqlDbType.NVarChar, 50);
        sqlCommand1.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@TOTAL_EMPLOYEES", SqlDbType.Int);
        sqlCommand1.Parameters.Add(sqlParameter);

        // For Table T046_EMPLOYEES
        sqlParameter = new SqlParameter("@EMP_NAME", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@POSITION", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@HIRED_DATE", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@BIRTH_DATE", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@EMAIL", SqlDbType.NVarChar, 50);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@PHONE", SqlDbType.NVarChar, 25);
        sqlCommand2.Parameters.Add(sqlParameter);
        sqlParameter = new SqlParameter("@MARRIAGE", SqlDbType.NVarChar, 2);
        sqlCommand2.Parameters.Add(sqlParameter); 
    } 

    /// <summary>
    /// This method is called once for every row that passes through the component from Input0.
    ///
    /// Example of reading a value from a column in the the row:
    ///  string zipCode = Row.ZipCode
    ///
    /// Example of writing a value to a column in the row:
    ///  Row.ZipCode = zipCode
    /// </summary>
    /// <param name="Row">The row that is currently passing through the component</param>
    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        if (Row.EMPLOYEE.StartsWith("FILE CREATED DATE"))
        {
            // 則只取文件日期部分
            sqlCommand1.Parameters["@FILE_CREATED_DATE"].Value = Row.EMPLOYEE.Substring(19, 10);
            sqlCommand1.Parameters["@TOTAL_EMPLOYEES"].Value = 0;
            sqlCommand1.ExecuteNonQuery();
        }
        // 若是達到第二行
        else if (Row.EMPLOYEE.StartsWith("TOTAL EMPLOYEES"))
        {
            sqlCommand1.Parameters["@FILE_CREATED_DATE"].Value = "";
            sqlCommand1.Parameters["@TOTAL_EMPLOYEES"].Value = int.Parse(Row.EMPLOYEE.Substring(16, 10));

            sqlCommand1.ExecuteNonQuery();
        }
        else if (Row.EMPLOYEE.StartsWith("*"))
        {
            //不作處理
        }
        else
        {
            // 剩下的部分是主體內容部分,直接按照固定的列位置描述截取字符串 
            sqlCommand2.Parameters["@EMP_NAME"].Value = Row.EMPLOYEE.Substring(0, 50);
            sqlCommand2.Parameters["@POSITION"].Value = Row.EMPLOYEE.Substring(50, 50);
            sqlCommand2.Parameters["@HIRED_DATE"].Value = Row.EMPLOYEE.Substring(100, 12);
            sqlCommand2.Parameters["@BIRTH_DATE"].Value = Row.EMPLOYEE.Substring(112, 12);
            sqlCommand2.Parameters["@EMAIL"].Value = Row.EMPLOYEE.Substring(124, 50);
            sqlCommand2.Parameters["@PHONE"].Value = Row.EMPLOYEE.Substring(174, 25);
            sqlCommand2.Parameters["@MARRIAGE"].Value = Row.EMPLOYEE.Substring(199, 1);

            sqlCommand2.ExecuteNonQuery();
        }   
    }
   
    // 直接重寫鏈接釋放
    public override void ReleaseConnections()
    {
        base.ReleaseConnections();
    }
}

執行結果。

查詢結果以下:

對於第一張表,經過簡單的 SQL 語句便可以解決這個合併問題。

總結

其實對比 Script Component Source,Script Component Transformation (同步或異步) 這三種不規則的文件解析方式來講,前面幾種是最簡單的,特別是 Script Component Source 能夠很是直觀的看到兩個解析以後的 Output 操做。而且咱們的這四個案例,不一樣的解決方式實際上就是把解析不規則文件的過程分別放在了 Source 端,同步的 Transformation 端,異步的 Transformation 端,Destination 端。 不一樣的位置,解析的過程不同,對輸出的處理不同。而且經過這樣的一個案例可讓咱們對 Source, 同步轉換,異步轉換這些概念更加深入了。

而且在這些個案例中,咱們可以看到 Script Component 強大的自定義編程能力,文件訪問,數據庫訪問,同步轉換,異步遍歷等不一樣的解決方案。在實際的 ETL 項目中,咱們能夠針對不一樣的場景靈活的使用 Script Component 來解決這些問題。

最後,仍是要提醒一下就這種不規則的文件處理不要使用 Script Component Destination 來處理,關於它的效率問題從下圖中就能夠看得出來。如下測試環境爲3G左右的虛機,包括磁盤空間也都很緊張,具體的測試數據在不一樣的環境下可能表現不一樣。

而緣由其實能夠分析出來:

  • Script Component Transformation 異步轉換並非一個徹底阻塞組件,它是一個半阻塞組件。在拿到上游的所有或者部分 Buffer 的時候,ProcessInput() 方法 就已經開始工做了,處理一部分 Buffer 就往下輸出一部分 Buffer。而且在經過 OLE DB Destination 組件的時候,也是批量插入,所以在本案例中效率最高。
  • Script Component Source 是我之前在項目中常用到的一種方式,也是比較喜歡的一種方式。可是一般的作法就一次從文件讀取一行,而後輸出一行致使效率沒有 Transformation 異步轉換高。因爲文件通常都是200 - 500MB,所以沒有遇到特別大的性能問題,因此就沒有進一步的優化。
  • Script Component Transformation 同步轉換是一個非阻塞組件,可是因爲一次處理一行輸出一行,這個過程略微花費一點時間。
  • Script Component Destination 並非一個完整意義上的轉換組件,在本案例中是做爲一個 Destination 組件來處理,受 ProcessInputRow() 方法限制也是一行一行的經過 ADO.NET 方式插入,所以效率最低。

關於阻塞,半阻塞,同步和異步的文章能夠參考 微軟BI 之SSIS 系列 - 理解Data Flow Task 中的同步與異步, 阻塞,半阻塞和全阻塞以及Buffer 緩存概念 此文中提到的 Script Component 是以默認的同步 Transformation 轉換組件爲例,所以歸在 Non-Blocking 部分。

更多 BI 文章請參看 BI 系列隨筆列表 (SSIS, SSRS, SSAS, MDX, SQL Server)  若是以爲這篇文章看了對您有幫助,請幫助推薦,以方便他人在 BIWORK 博客推薦欄中快速看到這些文章。

相關文章
相關標籤/搜索