基於XML描述的可編程函數式ETL實現

轉載本文需註明出處:微信公衆號EAWorld,違者必究。數據庫


引言:apache


傳統 ETL 主要以 SQL 爲主要技術手段,把數據經抽取、清洗轉換以後加載到數據倉庫。可是在現在移動互聯網大力發展的場景下,產生大量碎片化和不規則的數據。政府,公安等行業,傳統數據庫已經遠遠沒法知足需求。數據原始文件經過文件導入到基礎庫,再經過大數據 HQL等技術手段提取出二級庫,這中間的數據導入和 SQL ETL 的提取的過程,大量消耗 IO 性能和計算資源,在不少場景下已是數據處理的瓶頸所在。編程

普元在實施公安項目過程當中開發了一種基於 XML 描述的可編程的函數 ETL 轉換方法。主要用於大數據文件處理領域,能從原始數據文件直接、快速加載到專題庫的技術手段。技術方案主要解決了用 XML 的技術手段描述數據文件的格式,包含文件字段切分、字段類型、默認值、異常值校驗、時間格式校驗。在處理時可添加自行開發的 JAVA UDF 函數,函數實參支持變量、常量、表達式、函數和運算符重載。同時函數支持多層嵌套,即內部函數的返回值最爲外部函數的實參。該方案實現了 XML 內函數體的語法解析並在運行過程當中直接編譯爲 Java 字節碼的技術。有效的解決了政府、公安、電信行業巨量的數據處理須要的大量計算資源和 IO 性能瓶頸,有效的提升了數據處理效率和下降了數據處理開發難度。
api


目錄:數組


1、基於 XML 控制文件解析數據文件方案介紹安全

2、XML 控制文件結構和語法微信

3、函數和多層嵌套函數傳參網絡

4、UDF 函數編寫方法架構

5、數據測試工具框架

6、FlumeOnYarn 架構和分佈式部署


1、基於 XML 控制文件解析數據文件方案介紹


對於數據開發項目,咱們經常會面臨衆多的數據對接,部分場景不只數據量大,且數據種類多,數據解析開發工做量巨大。對於大量數據對接,通常設計的 RPC 接口和 WebService 通常都達不到數據性能要求的。而且他們都是點對點的服務,一旦上下游系統故障,都會形成整個數據對接異常。所以大部分都會選擇使用文件的方式進行數據對接。



對於非實時數據對接需求,這種方式的優勢:


  1. 在數據量大的狀況下,能夠經過文件傳輸,上游只寫入,無需關心數據業務和故障;

  2. 方案簡單,避免了網絡協議相關的概念;

  3. 維護簡單,只需保證磁盤寫入穩定性便可;


咱們經常會面臨基於此架構的數據對接。但基於此架構數據處理工做都在下游(即數據使用方)。


面對大量數據對接和衆多的數據類型,咱們對於每種數據文件解析、解碼、清洗消耗大量的人力,而且基於編碼的方式對於較多數據類型的場景代碼量大,且難以管理。所以通過屢次數據開發實踐,咱們開發了一種基於 XML 描述的方式來解析和清洗數據文件的實現。


本架構實現適合如下幾個方面:


  1. 基於文件的數據對接;

  2. 文件沒法直接導入到目標數據庫,須要作轉換,清洗爲目標格式;


如上數據對接架構圖,Flume 基本實現了基於文件系統的自動掃描和讀取,所以架構實現了基於 Flume Sink 的模塊。本架構也可做爲SDK 做爲框架集成到現有數據處理方案中。


2、XML數據控制文件結構和語法


<?xml version="1.0" encoding="UTF-8"?><schema><key>JD_TYPE_V1</key><type>textfile</type><delimiter>,</delimiter><fields><field type="int">exp_flag</field> <field type="string">sender_id</field>  <field type="string">sender_num</field> <field type="string" value="unknown">sender_address</field> <field type="string">receiver_num</field> <field type="date" pattern="yyyy-MM-dd HH:mm:ss">expect_time</field> <field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field> <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field> <field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field> <field type="string" default="true" value="province_code(sender_province)">sender_province_code</field> </fields></schema>

(可左右滑動查看所有代碼)


如上 XML 描述了一種數據文件類型及該類型的切分方法,數據每行通過切分後,產生的多個數據列的轉換方法。


理論上,每種數據類型應該對應一個控制文件,意味着控制文件來描述該種數據類型如何解析和轉換。


  1. Key 主要標註該控制文件處理的類型ID;

  2. Delimiter 爲文件列切割字符;

  3. Fields 中包含每列的字段描述;

  4. 數據類型支持Java基本類型和date類型;

  5. Skip爲數據對齊語法,控制在列中忽略某列的值;

  6. Default = true 屬性爲數據對齊語法,給某列提供默認值,提供默認值的列在數據列中不移動位移;

  7. Value 提供了給該字段提供當列中無值時提供默認值;value=null則指定列值爲null;

  8. Date 類型需 pattern 屬性;


3、函數和多層嵌套函數傳參



默認值


詞法分析時字段field 的value 屬性值沒有以英文小括號閉合的實體。以下示例中的primeton:

<field type="string" default="true" value="primeton">data_vendor</field>

(可左右滑動查看所有代碼)


函數


函數是由一組字符串、數字、下劃線組成的合法函數名和0 到多個形式參數組成。在詞法分析時字段field 的 value 屬性值由英文小括號閉合的實體。以下示例中的:

location(),yn(),concat();<field type="string" default="true" value=" unix_timestamp ">curr_time</field><field type="string" default="true" value="location(receiver_num)">receiver_num_origin</field> <field type="string" default="true" value="yn(none(sender_num))">is_sender_num_null</field>    <field type="string" default="true" value="concat(caller_number, '-', called_number)">number_connect</field>

(可左右滑動查看所有代碼)


函數名


函數體小括號前面的部分。通常由字符串、數字、下劃線組成的一組特定的名稱。如location(receiver_tel),location 即爲該函數的函數名稱。


函數的形式參數:


1.無參數


詞法分析時value的值知足函數條件且函數體內無參數。以下示例中:unix_timestamp() 得到當前系統內的 Unix 時間戳;

<field type="string" default="true" value=" unix_timestamp()">curr_time</field>

(可左右滑動查看所有代碼)


2.常量型形參


詞法分析時函數體內以英文單引號引用的值爲函數體的常量型形參。如’100’,函數示例爲:random_int(‘100’),生成 0-100 之內的隨機整形數值; 

<field type="string" default="true" value="random_int(‘100’)">rand_num</field>

(可左右滑動查看所有代碼)


3.變量型形參


詞法分析時函數體內參數沒有英文單引號引用而且不以英文小括號閉合的爲函數體的變量型形參。以下示例中的receiver_tel;

<field type="string" default="true" value="location(receiver_tel)">r_num_loc</field>

(可左右滑動查看所有代碼)


4.函數型形參


詞法分析時函數體內沒有英文單引號而且以英文小括號閉合的參數類型參數爲函數體的函數型參數。以下示例中的:none(sender_num)和none(receiver_num);

<field type="string" default="true" value="yn(all_true(none(sender_num), none(receiver_num)))">all_num_null</field>

(可左右滑動查看所有代碼)


詞法分析得到到函數體的同時,使用函數名調用UdfRegistors.getUdf(udfName) 函數,以檢驗當前系統必要存在該函數,不然則拋出沒法識別的函數異常。


5.類型校驗


詞法分析階段得到了字段 field 的取值是默認值或者函數,下一步需校驗其默認值或函數的返回值是否能和定義的字段類型相匹配。若是是函數同時校驗函數的形參和實參類型是否相匹配。

<field type="string" default="true" value="primeton">data_vendor</field><field type="int" default="true" value="2">call_flag</field>

(可左右滑動查看所有代碼)


如上示例中的primeton 需能轉換爲 string 類型,call_flag 需能轉換爲 int 類型。若是類型不能轉換,則會拋出類型沒法轉換異常。對於函數,經過 returnType 返回類型和字段類型進行校驗,可匹配或者是該類型的子類型則類型驗證經過。


4、UDF 函數編寫方法


編寫一個UDF函數的步驟:


  1. 繼承 UDF 類,實現 eval 方法;

  2. Eval 方法傳入的是一個數組參數;

  3. 判斷參數長度是否和預期的一致;

  4. 判斷位置參數類型是否和預期的一致;

  5. 實現函數體;

  6. 返回eval函數執行的返回值,理論上該返回值的類型應該一致,不該該同一函數返回多種類型值;

  7. 函數編寫者應該保證函數體內是線程安全的;


UDF 實現以下:

public abstract class UDF { /** * 是否支持該組參數類型,不支持拋出UnsupportedTypeException異常。默認返回 true */ public void support(Class<?>... paramsClass)throws UnsupportedTypeException; /*** 該 UDF 返回值類型,用於校驗嵌套函數類型是否匹配。可返回簡單類型,map,array,record 等類型.默認返回 String 類型*/ public Class<?> returnType();/*** UDF 執行函數,當輸入不符合預期時,向外拋出異常* @param params 函數的輸入實參* @return 函數輸出結果,簡單類型或者複雜類型,支持簡單類型,map,array,record 類型*/public abstract Object eval(Object... params);}

(可左右滑動查看所有代碼)


一個判斷是否包含子串的UDF 寫法:



全部的UDF都經過一個核心註冊類(這點相似 Hive 的FunctionRegistry)

public final class UdfRegistors { /** * UDF 函數映射 */static final Map<String, UDF> UDF_CACHED = new HashMap<String, UDF>(); static {UDF_CACHED.put("copy", new CopyUDF()); // 複製一個變量的值 UDF_CACHED.put("eq", new EqUDF()); // 判斷兩個變量是否相等 UDF_CACHED.put("yn", new YnUDF()); // 根據輸入true,false 轉換爲 Y、NUDF_CACHED.put("null", new NullUDF()); // 判斷變量是否爲null// add udf methodUDF_CACHED.put("location", new LocationUDF()); // 得到手機號碼的歸屬地 UDF_CACHED.put("nation_code", new NationCodeUDF()); // 根據國家名稱獲取國家代碼 UDF_CACHED.put("province_code", new ProvinceCodeUDF()); //根據省名稱獲取省代碼 UDF_CACHED.put("city_code", new CityCodeUDF()); // 根據城市名稱獲取城市代碼 UDF_CACHED.put("phone_num", new PhoneNumUDF()); // 校驗是不是手機號或者固話UDF_CACHED.put("number_format", new NumberFormatUDF()); //校驗是否能夠轉化成數字}/*** 添加一個UDF函數 * @param key UDF 函數 * @param value UDF 函數 eval 應線程安全 * @return */ public static boolean addUdf(String key, UDF value) { return UDF_CACHED.put(Optional.of(key).map((it)->it.toLowerCase()).get(), value) != null; } /** * 得到內置的 udf 函數 */ public static UDF getUdf(String udfName) { return UDF_CACHED.get(udfName.toLowerCase()); }}

(可左右滑動查看所有代碼)


UDF 函數註冊時期:


  1. 可在編譯期綁定內置的 UDF 函數;

  2. 可在系統啓動時配置自加載的 UDF 函數;

  3. 可在運行期動態注入UDF 函數;


5、數據測試工具


數據對接過程,面對數據是否能轉換爲目標結果經常無從所知。基於XML 控制文件的數據解析,可實現一個測試工具。該工具經過上傳數據文件和上傳 XML 控制文件,可對數據文件隨機的讀取行進行匹配測試,只要數據列和目標 XML文件能經過列匹配測試,則數據可經過 ETL 解析清洗。不然繼續修改 XML 控制文件,直到順利經過匹配。



6、FlumeOnYarn 架構和分佈式部署


本架構適合以文件做爲數據對接的方案,另外一方面,經過擴展 Flume 便可實現拿來主義。Flume 內部實現對 Channel 的 Transaction,對於每一個以文件構造的 Event 對象是原子操做,要麼所有成功,要麼失敗。flume依賴事務來保證event的可靠性。Flume 默認沒有分佈式實現,所以開發了 FlumeOnYarn 的架構,用於支持 Flume 的分佈式部署。


FlumeOnYarn優點


  1. 無需每一個節點安裝 Flume,可一鍵啓動和中止;

  2. 配置文件在客戶端節點修改,自動複製到 Yarn 上各實例,無需每一個節點修改;

  3. 基於 CDH或HDP的發行版,即便實現了 Web 可視化化的配置和分佈式部署,可是對於 Flume 只能實現單配置文件實例,沒法實現多配置實例;

  4. 集羣的規模能夠根據數據量大小進行實時的調整(增減節點),實現彈性處理。經過命令或者 api 便可控制(CDH 等須要在頁面添加 host,繁瑣且不易動態調整);

  5. 多個租戶或者同一租戶多個處理實例互不影響,且能隔離(Yarn Container);


FlumeOnYarn 架構



上圖所示,提交FlumeOnYarn 須要客戶端,該客戶端沒有太多和Flume安裝包結構特殊的地方,只是在 lib 下添加了 flume-yarn 的架構支持和 bin 下 flume-on-yarn 的啓動腳本。


Flume OnYarn 客戶端程序



經過 bin/flume-on-yarn 便可提交 FlumeOnYarn Application 集羣。以下的命令便可一次性申請多個 Yarn 資源節點,實現一鍵部署:

bin/flume-on-yarn yarn -s --name agent_name –conf conf/flume-hdfs.conf --num-instances 5

(可左右滑動查看所有代碼)


總結




推薦閱讀

元數據新型存儲架構的探索

基於 Spark 的數據分析實踐

本地讀寫的多活數據存儲架構設計要義


關於做者:震秦,普元資深開發工程師,專一於大數據開發 8 年,擅長 Hadoop 生態內各工具的使用和優化。參與某公關廣告(上市)公司DMP 建設,負責數據分層設計和批處理,調度實現,完成交付使用;參與國內多省市公安社交網絡項目部署,負責產品開發(Spark 分析應用);參與數據清洗加工爲我方主題庫並部署上層應用。


關於EAWorld:微服務,DevOps,數據治理,移動架構原創技術分享。長按二維碼關注!

本文分享自微信公衆號 - EAWorld(eaworld)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索