ADF 第六篇:Copy Data Activity詳解

在Azure 數據工程中,可使用Copy Data 活動把數據從on-premises 或雲中複製到其餘存儲中。Copy Data 活動必須在一個IR(Integration Runtime)上運行,對於把存儲在on-premises中的數據複製到其餘存儲時,必須建立一個self-hosted Integration Runtime。sql

一,認識Copy Data Activity

建立一個Pipeline,從Activities列表中找到「Copy data」,拖放到Pipeline畫布中,以下圖所示:安全

在General選項卡中,設置Activity的常規屬性架構

  • Name:爲Activity命名
  • Timeout:設置Activity的超時時間
  • Retry:重試次數
  • Retry interval:重試一次間隔的時間,單位是second
  • Secure output:安全輸出,若是勾選,那麼該Activity的輸出不會記錄
  • Secure input:安全入,若是勾選,那麼該Activity的輸入不會記錄

二,設置源屬性

Source選項卡用於設置Copy data Activity的源屬性,併發

1,Source 的常規設置app

Source dataset:設置源的dataset性能

use query:Table選項表示整個表做爲一個數據源,Query或 Store procedure選項表示使用查詢語句或存儲過程來獲取數據源。spa

Query timeout(minutes):表示查詢超時的時間日誌

Isolation level:設置查詢隔離級別,做用於數據源。code

2,Partition optionorm

指定從SQL Server加載數據的分區選項,當啓用分區選項時(不是None),從SQL Server 同時加載數據的併發度由Copy data Activity的Degree of copy parallelism屬性設置。Physical Partitions Of Table表示數據工廠根據原始表的分區定義來肯定分區列和分區機制;當選擇Dynamic range選項時,用戶還須要設置Partition column name、Partition upper bound 和Partition lower bound三項,手動設置分區列和分區機制。

3,Additional columns

添加額外的列,Value由三種類型:Add dynamic content、$$COLUMN和Custom。

$$COLUMN:表示把源的指定列複製爲另外一列

Custom:表示添加一列,列指是常量

Add dynamic content,表示添加動態上下文(Dynamic Content),動態上下文是指數據工廠的上下文,這些動態上下文由系統變量(System variables)來提供:

三,設置Sink

Sink是Copy Data Activity複製數據的目標數據集,Data Factory 使用 Sink dataset來設置目標。

1,Store procedure name

從Sink dataset中選擇存儲過程,該存儲過程定義瞭如何把元數據應用於目標表。該存儲過程每一個batch調用一次,對於僅運行一次且與源數據無關的操做,請使用 Pre-copy script 屬性。

若是使用Pre-copy script 屬性,一般意味着數據是全量更新,重寫整個表,好比如下腳本:

truncate table staging_table

Copy data activity的執行過程是:每次執行Copy data activity,數據工廠首先執行Pre-copy script,而後使用最新的數據插入數據到target table。

若是使用存儲過程,一般是對數據進行增量更新,要實現增量更新,其實是把數據集做爲參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型,存儲過程的代碼實現以下腳本所示,

CREATE PROCEDURE spOverwriteMarketing 
  @Marketing [dbo].[MarketingType] READONLY
  , @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source   ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END

2,Table option

若是設置爲Auto create table,那麼當目標表不存在時,數據工廠根據Source 的元數據自動建立目標表。

3,常規設置

Write batch timeout:每一個batch數據寫入的超時時間

Write batch size:每一個batch的數據行數量

Max concurrent connections:訪問數據存儲的最大的併發鏈接數量

四,設置Mapping

在Mapping選項卡中,主要設置Source 和 Sink之間的列映射

1,Type conversion settings用於設置類型轉換

  • Allow data truncation: 在把source數據轉換到sink時,若是字段的類型不一樣,容許數據截斷。
  • Treat boolean as number:把bool值做爲數值來看待,把true看做1,把false看做1
  • DateTime format:DateTime類型的格式
  • DateTimeOffset format:數據間隔的格式
  • TimeSpan format:TimeSpan的格式
  • Culture:locale

2,列映射

設置列與列之間的映射關係,用戶須要點擊「Import schemas」來導入架構元數據。

五,設置Settings

配置Copy data Activity的設置

1,常規的設置

  • Data integration unit:數據集成的單元
  • Degree of copy parallelism:指定數據加載時併發度
  • Data consistency verification:當勾選時,Copy data Activity會在數據移動以後,對數據進行一致性檢查
  • Enable logging:啓用日誌,記錄複製的文件,跳過的數據行和文件
  • Enable staging:指定是否要經過臨時存儲來複制數據

2,設置Fault tolerance

當設置Fault tolerance (錯誤容忍)以後,用戶能夠忽略在複製數據過程當中出現的一些錯誤,能夠忽略的錯誤類型主要有三個:

  • Skip incompatible rows:跳過不兼容的行
  • Skip missing files:跳過缺失的文件
  • Skip forbidden files:跳過禁止的文件

六,數據更新的全量更新和增量更新

數據更新的方式主要有:全量更新、追加數據、增量更新。

1,數據的全量更新和追加更新

若是使用Pre-copy script 屬性,一般意味着數據是全量更新和追加更新。

在插入數據以前,若是先清空目標表,再向目標表插入數據,這種方式是全量更新;若是不清空目標表,只是向目標表插入新的數據,那麼就是追加更新,前提是保證數據是無重複的新數據。

2,經過存儲過程來實現Copy data Activity的增量更新

若是Sink屬性使用存儲過程,那麼是對數據進行增量更新。實現數據的增量更新,其實是把數據集做爲參數傳遞給存儲過程,這就意味着存儲過程的一個參數必須是表變量類型。

因爲存儲過程在鏈接表變量時,性能較差,建議對分batch插入,每一個batch進行一次插入操做。

建立一個表類型,做爲存儲過程的參數,表的架構和輸入數據的架構相同:

CREATE TYPE [dbo].[MarketingType] AS TABLE
( [ProfileID] [varchar](256) NOT NULL, [State] [varchar](256) NOT NULL, [Category] [varchar](256) NOT NULL )

建立存儲過程,第一個變量是表變量,該存儲過程的做用是把表變量的數據更新到Sink指定的target table中。

CREATE PROCEDURE spOverwriteMarketing 
  @Marketing [dbo].[MarketingType] READONLY
  , @category varchar(256) AS BEGIN MERGE [dbo].[Marketing] AS target USING @Marketing AS source ON (target.ProfileID = source.ProfileID and target.Category = @category) WHEN MATCHED THEN UPDATE SET State = source.State WHEN NOT MATCHED THEN INSERT (ProfileID, State, Category) VALUES (source.ProfileID, source.State, source.Category); END

3,使用臨時表來實現增量更新

先把數據加載到臨時表,經過merge語句把臨時數據歸併到product table。

 

參考文檔:

Copy data to and from SQL Server by using Azure Data Factory

Copy activity in Azure Data Factory

相關文章
相關標籤/搜索