在Azure 數據工程中,可使用Copy Data 活動把數據從on-premises 或雲中複製到其餘存儲中。Copy Data 活動必須在一個IR(Integration Runtime)上運行,對於把存儲在on-premises中的數據複製到其餘存儲時,必須建立一個self-hosted Integration Runtime。sql
建立一個Pipeline,從Activities列表中找到「Copy data」,拖放到Pipeline畫布中,以下圖所示:安全
在General選項卡中,設置Activity的常規屬性架構
Source選項卡用於設置Copy data Activity的源屬性,併發
1,Source 的常規設置app
Source dataset:設置源的dataset性能
use query:Table選項表示整個表做爲一個數據源,Query或 Store procedure選項表示使用查詢語句或存儲過程來獲取數據源。spa
Query timeout(minutes):表示查詢超時的時間日誌
Isolation level:設置查詢隔離級別,做用於數據源。code
2,Partition optionorm
指定從SQL Server加載數據的分區選項,當啓用分區選項時(不是None),從SQL Server 同時加載數據的併發度由Copy data Activity的Degree of copy parallelism屬性設置。Physical Partitions Of Table表示數據工廠根據原始表的分區定義來肯定分區列和分區機制;當選擇Dynamic range選項時,用戶還須要設置Partition column name、Partition upper bound 和Partition lower bound三項,手動設置分區列和分區機制。
3,Additional columns
添加額外的列,Value由三種類型:Add dynamic content、$$COLUMN和Custom。
$$COLUMN:表示把源的指定列複製爲另外一列
Custom:表示添加一列,列指是常量
Add dynamic content,表示添加動態上下文(Dynamic Content),動態上下文是指數據工廠的上下文,這些動態上下文由系統變量(System variables)來提供:
Sink是Copy Data Activity複製數據的目標數據集,Data Factory 使用 Sink dataset來設置目標。
1,Store procedure name
從Sink dataset中選擇存儲過程,該存儲過程定義瞭如何把元數據應用於目標表。該存儲過程每一個batch調用一次,對於僅運行一次且與源數據無關的操做,請使用 Pre-copy script 屬性。
若是使用Pre-copy script 屬性,一般意味着數據是全量更新,重寫整個表,好比如下腳本:
truncate table staging_table
Copy data activity的執行過程是:每次執行Copy data activity,數據工廠首先執行Pre-copy script,而後使用最新的數據插入數據到target table。
若是使用存儲過程,一般是對數據進行增量更新,要實現增量更新,其實是把數據集做爲參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型,存儲過程的代碼實現以下腳本所示,
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
2,Table option
若是設置爲Auto create table,那麼當目標表不存在時,數據工廠根據Source 的元數據自動建立目標表。
3,常規設置
Write batch timeout:每一個batch數據寫入的超時時間
Write batch size:每一個batch的數據行數量
Max concurrent connections:訪問數據存儲的最大的併發鏈接數量
在Mapping選項卡中,主要設置Source 和 Sink之間的列映射
1,Type conversion settings用於設置類型轉換
2,列映射
設置列與列之間的映射關係,用戶須要點擊「Import schemas」來導入架構元數據。
配置Copy data Activity的設置
1,常規的設置
2,設置Fault tolerance
當設置Fault tolerance (錯誤容忍)以後,用戶能夠忽略在複製數據過程當中出現的一些錯誤,能夠忽略的錯誤類型主要有三個:
數據更新的方式主要有:全量更新、追加數據、增量更新。
1,數據的全量更新和追加更新
若是使用Pre-copy script 屬性,一般意味着數據是全量更新和追加更新。
在插入數據以前,若是先清空目標表,再向目標表插入數據,這種方式是全量更新;若是不清空目標表,只是向目標表插入新的數據,那麼就是追加更新,前提是保證數據是無重複的新數據。
2,經過存儲過程來實現Copy data Activity的增量更新
若是Sink屬性使用存儲過程,那麼是對數據進行增量更新。實現數據的增量更新,其實是把數據集做爲參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型。
因爲存儲過程在鏈接表變量時,性能較差,建議對分batch插入,每一個batch進行一次插入操做。
建立一個表類型,做爲存儲過程的參數,表的架構和輸入數據的架構相同:
CREATE TYPE [dbo].[MarketingType] AS TABLE
( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )
建立存儲過程,第一個變量是表變量,該存儲過程的做用是把表變量的數據更新到Sink指定的target table中。
CREATE PROCEDURE spOverwriteMarketing
@Marketing [dbo].[MarketingType] READONLY
, @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END
3,使用臨時表來實現增量更新
先把數據加載到臨時表,經過merge語句把臨時數據歸併到product table。
參考文檔:
Copy data to and from SQL Server by using Azure Data Factory