Apache Flink 從 1.9.0 版本開始增長了與 Hive 集成的功能,用戶能夠經過 Flink 來訪問 Hive 的元數據,以及讀寫 Hive 中的表。本文將主要從項目的設計架構、最新進展、使用說明等方面來介紹這一功能。sql
SQL 是大數據領域中的重要應用場景,爲了完善 Flink 的生態,發掘 Flink 在批處理方面的潛力,咱們決定加強 FlinkSQL 的功能,從而讓用戶可以經過 Flink 完成更多的任務。數據庫
Hive 是大數據領域最先出現的 SQL 引擎,發展至今有着豐富的功能和普遍的用戶基礎。以後出現的 SQL 引擎,如 Spark SQL、Impala 等,都在必定程度上提供了與 Hive 集成的功能,從而方便用戶使用現有的數據倉庫、進行做業遷移等。所以咱們認爲提供與 Hive 交互的能力對於 FlinkSQL 也是很是重要的。架構
與 Hive 集成主要包含了元數據和實際表數據的訪問,所以咱們會從這兩方面介紹一下該項目的架構。oop
爲了訪問外部系統的元數據,Flink 提供了 ExternalCatalog 的概念。可是目前 ExternalCatalog 的定義很是不完整,基本處於不可用的狀態。所以,咱們提出了一套全新的 Catalog 接口來取代現有的 ExternalCatalog。新的 Catalog 可以支持數據庫、表、分區等多種元數據對象;容許在一個用戶 Session 中維護多個 Catalog 實例,從而同時訪問多個外部系統;而且 Catalog 以可插拔的方式接入 Flink,容許用戶提供自定義的實現。下圖展現了新的 Catalog API 的整體架構。性能
建立 TableEnvironment 的時候會同時建立一個 CatalogManager,負責管理不一樣的 Catalog 實例。TableEnvironment 經過 Catalog 來爲 Table API 和 SQL Client 用戶提供元數據服務。測試
目前 Catalog 有兩個實現,GenericInMemoryCatalog 和 HiveCatalog。其中 GenericInMemoryCatalog 保持了原有的 Flink 元數據管理機制,將全部元數據保存在內存中。而 HiveCatalog 會與一個 Hive Metastore 的實例鏈接,提供元數據持久化的能力。要使用 Flink 與 Hive 進行交互,用戶須要配置一個 HiveCatalog,並經過 HiveCatalog 訪問 Hive 中的元數據。另外一方面,HiveCatalog 也能夠用來處理 Flink 自身的元數據,在這種場景下,HiveCatalog 僅將 Hive Metastore 做爲持久化存儲使用,寫入 Hive Metastore 中的元數據並不必定是 Hive 所支持的格式。一個 HiveCatalog 實例能夠同時支持這兩種模式,用戶無需爲管理 Hive 和 Flink 的元數據建立不一樣的實例。大數據
另外,咱們設計了 HiveShim 來支持不一樣版本的 Hive Metastore。目前支持的 Hive 版本包括 2.3.4 和 1.2.1。優化
咱們提供了 Hive Data Connector 來讀寫 Hive 的表數據。Hive Data Connector 儘量的複用了 Hive 自己的 Input/Output Format 和 SerDe 等類,這樣作的好處一方面是減小了代碼重複,更重要的是能夠最大程度的保持與 Hive 的兼容,即 Flink 寫入的數據 Hive 能夠正常讀取,而且反之亦然。ui
與 HiveCatalog 相似的,Hive Data Connector 目前支持的 Hive 版本也是 2.3.4 和 1.2.1。spa
Flink 與 Hive 集成的功能會在 1.9.0 版本中做爲試用功能發佈,用戶能夠經過 Table API 或者 SQL Client 的模式與 Hive 進行交互。下面列出的是在 1.9.0 中已經支持的功能:
因爲是試用功能,所以還有一些方面不夠完善,下面列出的是在 1.9.0 中缺失的功能:
部分數據類型不支持,包括Decimal、Char、Varchar、Date、Time、Timestamp、Interval、Union等。
使用 Flink 與 Hive 集成的功能,用戶首先須要添加相應的依賴。若是是使用 SQL Client,則須要將依賴的 jar 添加到 Flink 的 lib 目錄中;若是使用 Table API,則須要將相應的依賴添加到項目中(如pom.xml)。
如上文所述,目前支持的 Hive 版本包括 2.3.4 和 1.2.1,下表列出的是針對不一樣版本所需的依賴。
其中 flink-shaded-hadoop-2-uber 包含了 Hive 對於 Hadoop 的依賴。若是不用 Flink 提供的包,用戶也能夠將集羣中使用的 Hadoop 包添加進來,不過須要保證添加的 Hadoop 版本與 Hive 所依賴的版本是兼容的(Hive 2.3.4 依賴的 Hadoop 版本是 2.7.2;Hive 1.2.1 依賴的 Hadoop 版本是 2.6.0)。
依賴的 Hive 包(即 hive-exec 和 hive-metastore)也可使用用戶集羣中 Hive 所提供的 jar 包,詳情請見支持不一樣的 Hive 版本。
要與 Hive 交互,必須使用 HiveCatalog,下面介紹一下如何配置 HiveCatalog。
使用 SQL Client 時,用戶須要在 sql-client-defaults.yaml 中指定本身所需的 Catalog,在 sql-client-defaults.yaml 的「catalogs」列表中能夠指定一個或多個 Catalog 實例。如下的示例展現瞭如何指定一個 HiveCatalog:
catalogs: # A typical catalog definition looks like: - name: myhive type: hive hive-conf-dir: /path/to/hive_conf_dir hive-version: 2.3.4
其中 name 是用戶給每一個 Catalog 實例指定的名字, Catalog 名字和 DB 名字構成了 FlinkSQL 中元數據的命名空間,所以須要保證每一個 Catalog 的名字是惟一的。type 表示 Catalog 的類型,對於 HiveCatalog 而言,type 應該指定爲 hive。hive-conf-dir 用於讀取 Hive 的配置文件,用戶能夠將其設定爲集羣中 Hive 的配置文件目錄。hive-version 用於指定所使用的 Hive 版本,能夠設定爲 2.3.4 或者 1.2.1。
指定了 HiveCatalog 之後,用戶就能夠啓動 sql-client,並經過如下命令驗證 HiveCatalog 已經正確加載。
Flink SQL> show catalogs; default_catalog myhive Flink SQL> use catalog myhive;
其中 show catalogs 會列出加載的全部 Catalog 實例。須要注意的是,除了用戶在sql-client-defaults.yaml 文件中配置的 Catalog 之外,FlinkSQL 還會自動加載一個 GenericInMemoryCatalog 實例做爲內置的 Catalog,該內置 Catalog 默認名字爲 default_catalog。
使用 use catalog 能夠設定用戶 Session 當前的 Catalog。用戶在 SQL 語句中訪問元數據對象(如 DB、Table 等)時,若是不指定 Catalog 名字,則 FlinkSQL 會在當前 Catalog 中進行查找。
下面的代碼展現瞭如何經過 TableAPI 來建立 HiveCatalog,並註冊到 TableEnvironment。
String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/path/to/hive_conf_dir"; String version = "2.3.4"; TableEnvironment tableEnv = …; // create TableEnvironment HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog(name, hiveCatalog); tableEnv.useCatalog(name);
將 HiveCatalog 註冊到 TableEnvironment 之後,就能夠在經過 TableEnvironment 提交 SQL 的時候訪問 HiveCatalog 中的元數據了。與 SQL Client 相似, TableEnvironment 也提供了 useCatalog 接口讓用戶設定當前 Catalog。
設置好 HiveCatalog 之後就能夠經過 SQL Client 或者 Table API 來讀寫 Hive 中的表了。
假設 Hive 中已經有一張名爲 src 的表,咱們能夠用如下的 SQL 語句來讀寫這張表。
Flink SQL> describe src; root |-- key: STRING |-- value: STRING Flink SQL> select * from src; key value 100 val_100 298 val_298 9 val_9 341 val_341 498 val_498 146 val_146 458 val_458 362 val_362 186 val_186 …… …… Flink SQL> insert into src values ('newKey','newVal');
相似的,也能夠經過 Table API 來讀寫上面提到的這張表。下面的代碼展現瞭如何實現這一操做。
TableEnvironment tableEnv = …; // create TableEnvironment tableEnv.registerCatalog("myhive", hiveCatalog); // set myhive as current catalog tableEnv.useCatalog("myhive"); Table src = tableEnv.sqlQuery("select * from src"); // write src into a sink or do further analysis …… tableEnv.sqlUpdate("insert into src values ('newKey', 'newVal')"); tableEnv.execute("insert into src");
Flink 1.9.0 中支持的 Hive 版本是 2.3.4 和 1.2.1,目前咱們只針對這兩個版本進行了測試。使用 SQL Client 時,若是用戶沒有在 sql-client-defaults.yaml 文件中指定 Hive 版本,咱們會自動檢測 classpath 中的 Hive 版本。若是檢測到的 Hive 版本不是 2.3.4 或 1.2.1 就會報錯。
藉助 Hive 兼容性的保證,其它不一樣的小版本也比較多是能夠正常工做的。所以,若是用戶使用的 Hive 小版本與咱們所支持的不一樣,能夠指定一個支持的版原本試用與 Hive 集成的功能。好比用戶使用的 Hive 版本是 2.3.3,能夠在 sql-client-defaults.yaml 文件或者代碼中將 Hive 版本指定爲 2.3.4。
Flink 1.9.0 中 Hive 的 TableSink 只能在 batch 模式下工做,所以若是用戶想要使用 Hive 的 TableSink,須要將執行模式設置爲 batch。
Flink 1.9.0 增長了新的 blink planner,因爲 blink planner 相比於原來的 planner 功能更加全面,所以咱們建議在使用 FlinkSQL 與 Hive 集成時使用 blink planner。後續新的功能也可能會只支持 blink planner。
使用 SQL Client 時能夠像這樣在 sql-client-defaults.yaml 中指定執行模式和 planner:
execution: # select the implementation responsible for planning table programs # possible values are 'old' (used by default) or 'blink' planner: blink # 'batch' or 'streaming' execution type: batch
對應的 Table API 的寫法以下:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings);
咱們會在 Flink 後續版本中進一步完善與 Hive 集成的功能,預計會在 1.10.0 版本中實現 Production-Ready。咱們在後續版本中計劃開展的工做包括:
歡迎你們試用 Flink 1.9 中的 Hive 功能,若是遇到任何問題也歡迎你們經過釘釘、郵件列表等方式與咱們聯繫。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。