Kettle構建Hadoop ETL實踐(三):Kettle對Hadoop的支持

目錄java

1、Hadoop相關的步驟與做業項node

2、鏈接Hadoopmysql

1. 鏈接Hadoop集羣web

(1)開始前準備正則表達式

(2)配置步驟sql

2. 鏈接Hive數據庫

3. 鏈接Impalaexpress

4. 創建MySQL數據庫鏈接apache

3、導入導出Hadoop集羣數據編程

1. 向HDFS導入數據

2. 向Hive導入數據

3. 從HDFS抽取數據到MySQL

4. 從Hive抽取數據到MySQL

4、執行HiveQL語句

5、執行MapReduce

1. 生成聚合數據集

(1)準備文件與目錄

(2)創建一個用於Mapper的轉換

(4)創建一個調用MapReduce步驟的做業

(5)執行做業並驗證輸出

2. 格式化原始web日誌

(1)準備文件與目錄

(2)創建一個用於Mapper的轉換

(3)創建一個調用MapReduce步驟的做業

(4)執行做業並驗證輸出

6、提交Spark做業

1. 在Kettle主機上安裝Spark客戶端

2. 爲Kettle配置Spark

(1)備份原始配置文件

(2)編輯spark-defaults.conf文件

(3)編輯spark-env.sh文件

(4)編輯core-site.xml文件

3. 提交Spark做業

(1)修改Kettle自帶的Spark例子

(2)保存行執行做業

7、小結


        本篇演示使用Kettle操做Hadoop上的數據。首先概要介紹Kettle對大數據的支持,而後用示例說明Kettle如何鏈接Hadoop,如何導入導出Hadoop集羣上的數據,如何用Kettle執行Hive的HiveQL語句,還會用一個典型的MapReduce轉換,說明Kettle在實際應用中是怎樣利用Hadoop分佈式計算框架的。本篇最後介紹如何在Kettle中提交Spark做業。

1、Hadoop相關的步驟與做業項

        在「ETL與Kettle」(https://wxy0327.blog.csdn.net/article/details/107985148)的小結中曾提到,Kettle具備完備的轉換步驟與做業項,使它可以支持幾乎全部常見數據源。一樣Kettle對大數據也提供了強大的支持,這體如今轉換步驟與做業項的「Big Data」分類中。本例使用的Kettle 8.3版本中所包含的大數據相關步驟有19個,做業項有10個。表3-1和表3-2分別對這些步驟和做業項進行了簡單描述。

步驟名稱

描述

Avro input

讀取Avro格式文件

Avro output

寫入Avro格式文件

Cassandra input

從一個Cassandra column family中讀取數據

Cassandra output

向一個Cassandra column family中寫入數據

CouchDB input

獲取CouchDB數據庫一個設計文檔中給定視圖所包含的全部文檔

HBase input

從HBase column family中讀取數據

HBase output

向HBase column family中寫入數據

HBase row decoder

對HBase的鍵/值對進行編碼

Hadoop file input

讀取存儲在Hadoop集羣中的文本型文件

Hadoop file output

向存儲在Hadoop集羣中的文本型文件中寫數據

MapReduce input

向MapReduce輸入鍵值對

MapReduce output

從MapReduce輸出鍵值對

MongoDB input

讀取MongoDB中一個指定數據庫集合的全部記錄

MongoDB output

將數據寫入MongoDB的集合中

ORC input

讀取ORC格式文件

ORC output

寫入ORC格式文件

Parquet input

讀取Parquet格式文件

Parquet output

寫入Parquet格式文件

SSTable output

做爲Cassandra SSTable寫入一個文件系統目錄

表3-1 Kettle轉換中的大數據相關步驟

做業項名稱

描述

Amazon EMR job executor

在Amazon EMR中執行MapReduce做業

Amazon Hive job executor

在Amazon EMR中執行Hive做業

Hadoop copy files

將本地文件上傳到HDFS,或者在HDFS上覆制文件

Hadoop job executor

在Hadoop節點上執行包含在JAR文件中的MapReduce做業

Oozie job executor

執行Oozie工做流

Pentaho MapReduce

在Hadoop中執行基於MapReduce的轉換

Pig script executor

在Hadoop集羣上執行Pig腳本

Spark submit

提交Spark做業

Sqoop export

使用Sqoop將HDFS上的數據導出到一個關係數據庫中

Sqoop import

使用Sqoop將一個關係數據庫中的數據導入到HDFS上

表3-2 Kettle做業中的大數據相關做業項

        Kettle的設計很獨特,它既能夠在Hadoop集羣外部執行,也能夠在Hadoop集羣內的節點上執行。在外部執行時,Kettle可以從HDFS、Hive和HBase抽取數據,或者向它們中裝載數據。在Hadoop集羣內部執行時,Kettle轉換能夠做爲Mapper或Reducer任務執行,並容許將Pentaho MapReduce做業項做爲MapReduce的可視化編程工具來使用。後面咱們會用示例演示這些功能。關於Hadoop及其組件的基本概念和功能特性不是本專題所討論的範疇,可參考其它資源。

2、鏈接Hadoop

        Kettle能夠與Hadoop協同工做。經過提交適當的參數,Kettle能夠鏈接Hadoop的HDFS、MapReduce、Zookeeper、Oozie、Sqoop和Spark服務。在數據庫鏈接類型中支持Hive和Impala。在本示例中配置Kettle鏈接HDFS、Hive和Impala。爲了給本專題後面實踐中建立的轉換或做業使用,咱們還將定義一個普通的mysql數據庫鏈接對象。

1. 鏈接Hadoop集羣

        要使Kettle鏈接Hadoop集羣,須要兩個操做:設置一個Active Shim;創建並測試鏈接。Shim是Pentaho開發的插件,功能有點相似於一個適配器,幫助用戶鏈接Hadoop。Pentaho按期發佈Shim,能夠從sourceforge網站下載與Kettle版本對應的Shim安裝包。使用Shim可以鏈接不一樣的Hadoop發行版本,如CDH、HDP、MapR、Amazon EMR等。當在Kettle中執行一個大數據的轉換或做業時,缺省會使用設置的Active Shim。初始安裝Kettle時,並無Active Shim,所以在嘗試鏈接Hadoop集羣前,首先要作的就是選擇一個Active Shim,選擇的同時也就激活了此Active Shim。設置好Active Shim後,再通過必定的配置,就能夠測試鏈接了。Kettle內建的工具能夠爲完成這些工做提供幫助。

(1)開始前準備

        在配置鏈接前,要確認Kettle具備訪問HDFS相關目錄的權限,訪問的目錄一般包括用戶主目錄以及工做須要的其它目錄。Hadoop管理員應該已經配置了容許Kettle所在主機對Hadoop集羣的訪問。除權限外,還須要確認如下信息:

        本例中已經安裝好4個節點的CDH 6.3.1集羣,IP地址及主機名以下:
172.16.1.124 manager
172.16.1.125 node1
172.16.1.126 node2
172.16.1.127 node3



        啓動的Hadoop服務如圖3-1所示,全部服務都使用缺省端口。關於CDH集羣的安裝與卸載,能夠參見個人博客「基於Hadoop生態圈的數據倉庫實踐 —— 環境搭建(二)」和「一鍵式徹底刪除CDH 6.3.1」。

圖3-1 Hadoop集羣服務

        爲了用主機名訪問Hadoop相關服務,在Kettle主機(172.16.1.101)的/etc/hosts文件中添加了Hadoop集羣四個節點的IP與主機名。

(2)配置步驟

1. 在Kettle中配置Hadoop客戶端文件
        在瀏覽器中登陸Cloudera Manager,選擇Hive服務,點擊「操做」->「下載客戶端配置」。在獲得的hive-clientconfig.zip壓縮包中包括了當前Hadoop客戶端的12個配置文件。將其中的core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml 5個文件複製到Kettle根目錄下的plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/目錄下,覆蓋原來Kettle自帶的這些文件。

2. 選擇Active Shim
        在Spoon界面中,選擇主菜單「工具」 -> 「Hadoop Distribution...」,在對話框中選擇「Cloudera CDH 6.1.0」,如圖3-2所示,點擊OK按鈕肯定後重啓Spoon。

圖3-2 選擇Active Shim

3. 在Spoon中建立Hadoop clusters對象
        新建一個轉換,在工做區左側的樹的「主對象樹」標籤中,選擇 Hadoop clusters -> 右鍵New Cluster,對話框中輸入如圖3-3所示的屬性值。

圖3-3 Hadoop集羣鏈接配置

        上圖的Hadoop集羣配置窗口中的選項及定義說明以下:

  • Cluster Name:定義要鏈接的集羣名稱,這裏爲CDH631。
  • Hostname(HDFS段):Hadoop集羣中NameNode節點的主機名。因爲本例中的CDH配置了HDFS HA,這裏用HDFS NameNode服務名替代了主機名。
  • Port(HDFS段):Hadoop集羣中NameNode節點的端口號,HA不須要填寫。 
  • Username(HDFS段):HDFS的用戶名,經過宿主操做系統給出,不用填。
  • Password(HDFS段):HDFS的密碼,經過宿主操做系統給出,不用填。
  • Hostname(JobTracker段):Hadoop集羣中JobTracker節點的主機名。若是有獨立的JobTracker節點,在此輸入,不然使用HDFS的主機名。
  • Port(JobTracker段):Hadoop集羣中JobTracker節點的端口號,不能與HDFS的端口號相同。
  • Hostname(ZooKeeper段):Hadoop集羣中Zookeeper節點的主機名,只有在鏈接Zookeeper服務時才須要。
  • Port(ZooKeeper段):Hadoop集羣中Zookeeper節點的端口號,只有在鏈接Zookeeper服務時才須要。
  • URL(Oozie段):Oozie WebUI的地址,只有在鏈接Oozie服務時才須要。

        這是本例CDH的配置,你應該按本身的狀況進行相應修改。而後點擊「Test」按鈕,測試結果如圖3-4所示。正常狀況下此時除了一個Kafka鏈接失敗的警告外,其它都應該經過測試。Kafka鏈接失敗,緣由是沒有配置Kafka的Bootstrap servers。咱們在CDH中並無啓動Kafka服務,所以忽略此警告。

圖3-4 測試經過

        關閉「Hadoop Cluster Test」窗口後,點擊「Hadoop cluster」窗口的「肯定」按鈕,至此就創建了一個Kettle能夠鏈接的Hadoop集羣。

        若是是首次配置Kettle鏈接Hadoop,不免會出現這樣那樣的問題,Pentaho文檔中列出了配置過程當中的常見問題及其通用解決方法,如表3-3所示。但願這能對Kettle或Hadoop新手有所幫助。

症狀

一般緣由

通用解決方法

Shim和配置問題

No shim

  • 沒有選擇shim。
  • shim安裝位置錯誤。
  • plugin.properties 文件中沒有正確的shim名稱。
  • 檢查plugin.properties文件中active.hadoop.configuration參數的值是否與pentaho-big-data-plugin/hadoop-configurations下的目錄名相匹配。
  • 確認shim安裝在正確的位置(缺省安裝在Kettle安裝目錄的plugins/pentaho-big-data-plugin子目錄下)。
  • 參考Pentaho 「Set Up Pentaho to Connect to a Hadoop Cluster」文檔,確認shim插件的名稱和安裝目錄。

Shim doesn't load

  • 沒有安裝許可證。
  • Kettle版本不支持裝載的shim。
  • 若是選擇的是MapR shim,客戶端可能沒有正確安裝。
  • 配置文件改變致使錯誤。
  • 參考Pentaho 「required licenses are installed」文檔,驗證許可證安裝,而且確認許可證沒有過時。
  • 參考Pentaho 「Components Reference」文檔,驗證使用的Kettle版本所支持的shim。
  • 參考Pentaho 「Set Up Pentaho to Connect to an Apache Hadoop Cluster」文檔,檢查配置文件。
  • 若是鏈接的是MapR,檢查客戶端安裝,而後重啓Kettle後再測試鏈接。
  • 若是該錯誤持續發生,文件可能損壞,須要從Pentaho官網下載新的shim文件。

The file system's URL does not match the URL in the configuration file

*-site.xml文件配置錯誤

參考Pentaho 「Set Up Pentaho to Connect to an Apache Hadoop Cluster」文檔,檢查配置文件,主要是core-site.xml文件是否配置正確。

Sqoop Unsupported major.minor version Error

在pentaho6.0中,Hadoop集羣上的Java版本比Pentaho使用的Java版本舊。

  • 驗證JDK是否知足受支持組件列表中的要求。
  • 驗證Pentaho服務器上的JDK是否與Hadoop集羣上的JDK主版本相同。

鏈接問題

Hostname does not resolve

  • 沒有指定主機名。
  • 主機名/IP地址錯誤。
  • 主機名沒有正確解析。
  • 驗證主機名/IP地址是否正確。
  • 檢查DNS或hosts文件,確認主機名解析正確。

Port name is incorrect

  • 沒有指定端口號。
  • 端口號錯誤。
  • 驗證端口號是否正確。
  • 確認Hadoop集羣是否啓用了HA,若是是,則不須要指定端口號。

Can't connect

  • 被防火牆阻止。
  • 其它網絡問題。
  • 檢查防火牆配置,並確認沒有其它網絡問題。

目錄訪問或權限問題

Can't access directory

  • 認證或權限問題。
  • 目錄不在集羣上。
  • 確認鏈接使用的用戶對被訪問的目錄有讀、寫、或執行權限。
  • 檢查集羣的安全設置(如dfs.permissions等)是否容許shim訪問。
  • 驗證HDFS的主機名和端口號是否正確。

Can't create, read, update, or delete files or directories

認證或權限問題。

  • 確認用戶已經被授予目錄的執行權限
  • 檢查集羣的安全設置(如dfs.permissions等)是否容許shim訪問。
  • 驗證HDFS的主機名和端口號是否正確。

Test file cannot be overwritten

Pentaho測試文件已在目錄中。

測試已運行,但未刪除測試文件。須要手動刪除測試文件。檢查Kettle根目錄下logs目錄下的spoon.log文件中記錄的測試文件名。測試文件用於驗證用戶能夠在其主目錄中建立、寫入和刪除。

表3-3 Kettle鏈接Hadoop時的常見問題

2. 鏈接Hive

        Kettle把Hive看成一個數據庫,支持鏈接Hive Server和Hive Server 2/3,數據庫鏈接類型的名字分別爲Hadoop Hive和Hadoop Hive 2/3。這裏演示在Kettle中創建一個Hadoop Hive 2/3類型的數據庫鏈接。

        Hive Server有兩個明顯的問題,一是不夠穩定,常常會莫名奇妙假死,致使客戶端全部的鏈接都被掛起。二是併發性支持很差,若是一個用戶在鏈接中設置了一些環境變量,綁定到一個Thrift工做線程,當該用戶斷開鏈接,另外一個用戶建立了一個鏈接,他有可能也被分配到以前的線程,複用以前的配置。這是由於Thrift不支持檢測客戶端是否斷開鏈接,也就沒法清除會話的狀態信息。Hive Server 2的穩定性更高,而且已經完美支持了會話。從長遠來看都會以Hive Server 2做爲首選。

        在工做區左側的「主對象樹」標籤中,選擇 「DB鏈接」 -> 右鍵「新建」,對話框中輸入如圖3-5所示的屬性值。

圖3-5 Hive鏈接配置

        上圖的數據庫鏈接配置窗口中的選項及定義說明以下:

  • Connection Name:定義鏈接名稱,這裏爲hive_cdh631。
  • Connection Type:鏈接類型選擇Hadoop Hive 2/3。
  • Host Name:輸入HiveServer2對應的主機名。在Cloudera Manager中,從Hive服務的「實例」標籤中能夠找到。
  • Datebase Name:這裏輸入的rds是Hive裏已經存在的一個數據庫名稱。
  • Port Number:端口號輸入hive.server2.thrift.port參數的值。
  • User Name:用戶名,這裏爲空。
  • Password:密碼,這裏爲空。

        點擊「測試」,應該彈出成功鏈接窗口,顯示內容以下:

正確鏈接到數據庫[hive_cdh631] 
主機名         : node2
端口           : 10000
數據庫名       :rds

        爲了讓其它轉換或做業可以使用此數據庫鏈接對象,須要將它設置爲共享。選擇 「DB鏈接」 -> hive_cdh631 -> 右鍵「共享」,而後保存轉換。

3. 鏈接Impala

        Impala是一個運行在Hadoop之上的大規模並行處理(Massively Parallel Processing,MPP)查詢引擎,提供對Hadoop集羣數據的高性能、低延遲的SQL查詢,使用HDFS做爲底層存儲。對查詢的快速響應使交互式查詢和對分析查詢的調優成爲可能,而這些在針對處理長時間批處理做業的SQL-on-Hadoop傳統技術上是難以完成的。Impala是Cloudera公司基於Google Dremel的開源實現。Cloudera公司宣稱除Impala外的其它組件都將移植到Spark框架,並堅信Impala是大數據上SQL解決方案的將來,可見其對Impala的重視程度。

        經過將Impala與Hive元數據存儲數據庫相結合,可以在Impala與Hive這兩個組件之間共享數據庫表。而且Impala與HiveQL的語法兼容,所以既可使用Impala也可使用Hive進行創建表、發佈查詢、裝載數據等操做。Impala能夠在已經存在的Hive表上執行交互式實時查詢。

        建立Impala鏈接的過程與Hive相似。在工做區左側的「主對象樹」標籤中,選擇「DB鏈接」 -> 右鍵「新建」,對話框中輸入如圖3-6所示的屬性值。

圖3-6 Impala鏈接配置

        上圖的數據庫鏈接配置窗口中的選項及定義說明以下:

  • Connection Name:定義鏈接名稱,這裏爲impala_cdh631。
  • Connection Type:鏈接類型選擇Impala。
  • Host Name:輸入任一Impala Daemon對應的主機名。在Cloudera Manager中,從Impala服務的「實例」標籤中能夠找到。
  • Datebase Name:這裏輸入的rds是Hive裏已經存在的一個數據庫名稱。
  • Port Number:端口號輸入Impala Daemon HiveServer2端口參數的值。
  • User Name:用戶名,這裏爲空。
  • Password:密碼,這裏爲空。

        點擊「測試」,應該彈出成功鏈接窗口,顯示內容以下:

正確鏈接到數據庫[impala_cdh631] 
主機名         : node3
端口           : 21050
數據庫名       :rds

        同hive_cdh631同樣,將impala_cdh631數據庫鏈接共享,而後保存轉換。

4. 創建MySQL數據庫鏈接

        Kettle中建立數據庫鏈接的方法都相似,區別只是在「鏈接類型」中選擇不一樣的數據庫,而後輸入相關的屬性,「鏈接方式」一般選擇Native(JDBC)。例如MySQL鏈接配置如圖3-7所示。

圖3-7 MySQL鏈接配置

        這裏的鏈接名稱爲mysql_node3。配置MySQL數據庫鏈接須要注意的一點是,須要事先將對應版本的MySQL JDBC驅動程序拷貝到Kettle根目錄的lib目錄下,不然在測試鏈接時可能出現以下錯誤:

org.pentaho.di.core.exception.KettleDatabaseException: 
Error occurred while trying to connect to the database

Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

        本例中鏈接的MySQL服務器版本爲5.6.14,所以使用下面的命令拷貝JDBC驅動,而後重啓Spoon以從新加載全部驅動。

cp mysql-connector-java-5.1.38-bin.jar /root/pdi-ce-8.3.0.0-371/lib/

        至此成功建立了一個Hadoop集羣對象CDH631,,以及三個數據庫鏈接對象hive_cdh63一、impala_cdh631和mysql_node3。

3、導入導出Hadoop集羣數據

        本節用四個示例演示如何使用Kettle導出導入Hadoop數據。這四個示例是:向HDFS導入數據;向Hive導入數據;從HDFS抽取數據到MySQL;從Hive抽取數據到MySQL。

1. 向HDFS導入數據

        用Kettle將本地文件導入HDFS很是簡單,只須要一個「Hadoop copy files」做業項就能夠實現。它執行的效果同 hdfs dfs -put 命令是相同的。從下面的地址下載Pentaho提供的web日誌示例文件,將解壓縮後的weblogs_rebuild.txt文件放到Kettle所在主機的本地目錄下。

http://wiki.pentaho.com/download/attachments/23530622/weblogs_rebuild.txt.zip?version=1&modificationDate=1327069200000

        在Spoon中新建一個只包含「Start」和「Hadoop copy files」兩個做業項的做業,如圖3-8所示。

圖3-8 向HDFS導入數據的做業

 

        雙擊「Hadoop Copy Files」做業項,編輯屬性以下:

  • Source Environment:選擇「Local」。
  • 源文件/目錄:選擇本地文件,本例爲「file:///root/kettle_hadoop/3/weblogs_rebuild.txt」
  • 通配符:空。
  • Destination Environment:選擇「CDH631」,這是咱們以前已經創建好的Hadoop Clusters對象。
  • Destination File/Folder:選擇HDFS上的目錄,本例爲/user/root。

        保存併成功執行做業後,查看HDFS目錄,結果以下。能夠看到,weblogs_rebuild.txt文件已從本地導入HDFS的/user/root目錄中。每次執行做業會覆蓋HDFS中已存在的同名文件。

[hdfs@manager~]$hdfs dfs -ls /user/root
Found 1 items
-rw-r--r--   3 root supergroup   77908174 2020-08-28 08:53 /user/root/weblogs_rebuild.txt
[hdfs@manager~]$

2. 向Hive導入數據

        Hive缺省是不能進行行級插入的,也就是說缺省時不能使用insert into ... values這種SQL語句向Hive插入數據。一般Hive表數據導入方式有如下兩種:

  • 從本地文件系統中導入數據到Hive表,使用的語句是:
    load data local inpath 目錄或文件 into table 表名;

     

  • 從HDFS上導入數據到Hive表,使用的語句是:
    load data inpath 目錄或文件 into table 表名;

     

        再有數據一旦導入Hive表,缺省是不能進行更新和刪除的,只能向表中追加數據或者用新數據總體覆蓋原來的數據。要刪除表數據只能執行truncate或者drop table操做,這其實是刪除了表所對應的HDFS上的數據文件或目錄。

        Kettle做業中的「Hadoop Copy Files」做業項能夠將本地文件上傳至HDFS,所以只要將前面的做業稍加修改,將Destination File/Folder選擇爲hive表所在的HDFS目錄便可,做業執行的效果與load data local inpath語句相同。

        首先從下面的地址下載Pentaho提供的格式化後的web日誌示例文件,將解壓縮後的weblogs_parse.txt文件放到Kettle所在主機的本地目錄下。

http://wiki.pentaho.com/download/attachments/23530622/weblogs_parse.txt.zip?version=1&modificationDate=1327068013000

        而後執行下面的HiveQL創建一個Hive表,表結構與weblogs_parse.txt文件的結構相匹配。

create table test.weblogs (
client_ip         string,
full_request_date string,
day               string,
month             string,
month_num         int,
year              string,
hour              string,
minute            string,
second            string,
timezone          string,
http_verb         string,
uri               string,
http_status_code  string,
bytes_returned    string,
referrer          string,
user_agent        string)
row format delimited fields terminated by '\t';

        建立和前例相同的做業,只是修改如下兩個做業項屬性:

  • 源文件/目錄:file:///root/kettle_hadoop/3/weblogs_parse.txt
  • Destination File/Folder:/user/hive/warehouse/test.db/weblogs

        保存併成功執行做業後,查詢test.weblogs表的記錄與weblogs_parse.txt文件內容相同。

3. 從HDFS抽取數據到MySQL

        從下面的地址下載文件
http://wiki.pentaho.com/download/attachments/23530622/weblogs_aggregate.txt.zip?version=1&modificationDate=1327067858000

        這是Pentaho提供的一個壓縮文件,其中包含一個名爲weblogs_aggregate.txt的文本文件,文件中有36616行記錄,每行記錄有4列,分別表示IP地址、年份、月份、訪問頁面數,前5行記錄以下。咱們使用這個文件做爲最初的原始數據。

0.308.86.81    2012    07    1
0.32.48.676    2012    01    3
0.32.85.668    2012    07    8
0.45.305.7    2012    01    1
0.45.305.7    2012    02    1

        用下面的命令把解壓縮後的weblogs_aggregate.txt文件上傳到HDFS的/user/root目錄下。

hdfs dfs -put weblogs_aggregate.txt /user/root/

        在Spoon中新建一個如圖3-9的轉換。轉換中只包含「Hadoop File Input」和「表輸出」 兩個步驟。

圖3-9 從HDFS抽取數據到MySQL的轉換

        編輯「Hadoop File Input」步驟屬性以下:
(1)「文件」標籤

  • Environment:選擇「CDH631」。
  • File/Folder:選擇「/user/root/weblogs_aggregate.txt」

(2)「內容」標籤

  • 文件類型:CVS
  • 分隔符:刪除分號,點擊「Insert TAB」按鈕插入TAB分隔符。
  • 頭部:勾掉。
  • 格式:選擇「Unix」。
  • 本地日期格式:選擇「en_US」

(3)「字段」標籤
        輸入如表3-4所示。

名稱

類型

格式

長度

去除空字符串方式

重複

client_ip

String

 

20

不去掉空格

year

Integer

#

15

不去掉空格

month_num

Integer

#

15

不去掉空格

pageviews

Integer

#

15

不去掉空格

表3-4 weblogs_aggregate.txt對應的字段

        編輯「表輸出」步驟屬性以下:

  • 數據庫鏈接:選擇「mysql_node3」。
  • 目標表:輸入「aggregate_hdfs」。
  • 剪裁表:勾選。

        mysql_node3是鏈接Hadoop時已經建好的一個MySQL數據庫鏈接。「主選項」和「數據庫字段」標籤下的屬性都不須要設置,「表字段」和「流字段」會自動映射。

        下面執行SQL創建mysql的表:

use test;
create table aggregate_hdfs (
    client_ip varchar(15),
    year smallint,
    month_num tinyint,
    pageviews bigint
);

        保存並執行轉換,而後查詢aggregate_hdfs表,結果以下:

mysql> select count(*) from test.aggregate_hdfs;
+----------+
| count(*) |
+----------+
|    36616 |
+----------+
1 row in set (0.03 sec)

mysql> select * from test.aggregate_hdfs limit 5;
+-------------+------+-----------+-----------+
| client_ip   | year | month_num | pageviews |
+-------------+------+-----------+-----------+
| 0.308.86.81 | 2012 |         7 |         1 |
| 0.32.48.676 | 2012 |         1 |         3 |
| 0.32.85.668 | 2012 |         7 |         8 |
| 0.45.305.7  | 2012 |         1 |         1 |
| 0.45.305.7  | 2012 |         2 |         1 |
+-------------+------+-----------+-----------+
5 rows in set (0.00 sec)

4. 從Hive抽取數據到MySQL

        在Spoon中新建一個如圖3-10的轉換。轉換中只包含「表輸入」和「表輸出」 兩個步驟。

圖3-10 從Hive抽取數據到MySQL的轉換

        編輯「表輸入」步驟屬性以下:

  • 數據庫鏈接:選擇「hive_cdh631」。
  • SQL:輸入下面的SQL語句:
    select client_ip, year, month, month_num, count(*) as pageviews
      from test.weblogs
     group by client_ip, year, month, month_num

     

        hive_cdh631是鏈接Hadoop時已經建好的一個Hive數據庫鏈接。

        編輯「表輸出」步驟屬性以下:

  • 數據庫鏈接:選擇「mysql_node3」。
  • 目標表:輸入「aggregate_hive」。
  • 剪裁表:勾選。

        下面執行SQL創建mysql的表:

use test;
create table aggregate_hive (
    client_ip varchar(15),
    year varchar(4),
    month varchar(10),
    month_num tinyint,
    pageviews bigint
);

        保存並執行轉換,而後查詢aggregate_hive表,結果以下:

mysql> select count(*) from test.aggregate_hive;
+----------+
| count(*) |
+----------+
|    36616 |
+----------+
1 row in set (0.03 sec)

mysql> select * from test.aggregate_hive limit 5;
+---------------+------+-------+-----------+-----------+
| client_ip     | year | month | month_num | pageviews |
+---------------+------+-------+-----------+-----------+
| 0.45.305.7    | 2012 | Feb   |         2 |         1 |
| 0.48.322.75   | 2012 | Jul   |         7 |         1 |
| 0.638.50.46   | 2011 | Dec   |        12 |         8 |
| 01.660.68.623 | 2012 | Jun   |         6 |         1 |
| 01.660.70.74  | 2012 | Jul   |         7 |         1 |
+---------------+------+-------+-----------+-----------+
5 rows in set (0.00 sec)

4、執行HiveQL語句

        在這個示例中演示如何用Kettle執行Hive的HiveQL語句。咱們在「向Hive導入數據」一節創建的weblogs表上執行聚合查詢,同時創建一個新表保存查詢結果。新建一個Kettle做業,只有「START」和「SQL」兩個做業項,如圖3-11所示。
 

圖3-11 執行Hive HiveQL語句的做業

        編輯「SQL」做業項屬性以下:

  • 數據庫鏈接:選擇「hive_cdh631」。
  • SQL腳本:
    create table test.weblogs_agg
    as
    select client_ip, year, month, month_num, count(*)
      from test.weblogs
     group by client_ip, year, month, month_num;

     

        保存併成功執行做業後檢查hive表,結果以下:

hive> select count(*) from test.weblogs_agg;
...
36616

        能夠看到weblogs_agg表中已經保存了所有的聚合數據。

5、執行MapReduce

1. 生成聚合數據集

        「執行HiveQL語句」示例只用一句HiveQL就生成了聚合數據,本示例使用「Pentaho MapReduce」做業項完成類似的功能,把細節數據彙總成聚合數據集。當給一個關係型數據倉庫或數據集市準備待抽取的數據時,這是一個常見的使用場景。咱們把weblogs_parse.txt文件做爲細節數據,目標是生成聚合數據文件,其中包含按IP和年月分組統計的PV數。

(1)準備文件與目錄

# 建立格式化文件所在目錄
hdfs dfs -mkdir /user/root/parse/
# 上傳格式化文件
hdfs dfs -put -f weblogs_parse.txt /user/root/parse/
# 修改讀寫權限
hdfs dfs -chmod -R 777 /user/root/

(2)創建一個用於Mapper的轉換

圖3-12 生成聚合數據Mapper轉換

        如圖3-12所示的轉換由「MapReduce Input」、「拆分字段」、「利用Janino計算Java表達式」、「MapReduce Output」四個步驟組成。

        編輯「MapReduce Input」步驟以下:

  • Key field:「Type」選擇「String」,定義 Hadoop MapReduce 鍵的數據類型。
  • Value field:「Type」選擇「String」,定義 Hadoop MapReduce 值的數據類型。

        該步驟輸出兩個字段,名稱是固定的key和value,也就是Map階段輸入的鍵值對。

        編輯「拆分字段」步驟以下:

  • 須要拆分的字段:選擇「value」。
  • 分隔符:輸入「$[09]」,以TAB做爲分隔符。
  • 字段:新的字段名以下,類型均爲String。
    client_ip
    full_request_date
    day
    month
    month_num
    year
    hour
    minute
    second
    timezone
    http_verb
    uri
    http_status_code
    bytes_returned
    referrer
    user_agent

     

        該步驟將輸入的value字段拆分紅16個字段,輸出17個字段(key字段沒變,文本文件每行的key是文件起始位置到每行的字節偏移量)。

        編輯「利用Janino計算Java表達式」步驟如表3-5所示。

New field

Java expression

Value type

new_key

client_ip + '\t' + year + '\t' + month_num

String

new_value

1

Integer

表3-5 聚合數據轉換中的「利用Janino計算Java表達式」步驟

        該步驟爲數據流中增長兩個新的字段,名稱分別定義爲new_key和new_value。new_key字段的值定義爲client_ip + '\t' + year + '\t' + month_num,將IP地址、年份、月份和字段間的兩個TAB符拼接成一個字符串。new_value字段的值爲1,數據類型是整數。該步驟輸出19個字段。

        編輯「MapReduce Output」步驟以下:

  • Key field:選擇「new_key」。
  • Value field:選擇「new_value」。

該步驟輸出「new_key」和「new_value」兩個字段,即Map階段輸出的鍵值對。

將轉換保存爲aggregate_mapper.ktr。

(3)創建一個用於Reducer的轉換

圖3-13 生成聚合數據Reducer轉換

如圖3-13所示的轉換由「MapReduce Input」、「分組」、「MapReduce Output」三個步驟組成。

編輯「MapReduce Input」步驟以下:
. Key field:「Type」選擇「String」。
. Value field:「Type」選擇「Integer」。

        該步驟輸出兩個字段,名稱是固定的key和value,key對應Mapper轉換的new_key輸出字段,value對應Mapper轉換的new_value輸出字段。

        編輯「分組」步驟以下:

  • 構成分組的字段:選擇「key」。
  • 聚合:名稱、Subject、類型三列的值分別是new_value、value、求和。

        該步驟按key字段分組(key字段的值就是client_ip + '\t' + year + '\t' + month_num),對每一個分組的value求和,每組的合計值定義爲一個新的字段new_value。注意,此處的new_value和Mapper轉換輸出的new_value字段含義是不一樣的。Mapper轉換輸出的new_value字段對應這裏的Subject字段值。

        編輯「MapReduce Output」步驟以下:

  • Key field:選擇「key」。
  • Value field:選擇「new_value」。

        輸出Reducer處理後的鍵值對,這就是咱們想要的結果。

        將轉換保存爲aggregate_reducer.ktr。

(4)創建一個調用MapReduce步驟的做業

圖3-14 聚合數據Pentaho MapReduce做業

        如圖3-14所示的做業使用mapper和reducer轉換。須要編輯Pentaho MapReduce做業項的Mapper、Reducer、job Setup、Cluster四個標籤頁,每一個標籤頁上的選項及定義。

        Mapper標籤:

  • Transformation:選擇第(1)步創建的Mapper轉換,這裏爲「/root/kettle_hadoop/3/aggregate_mapper.ktr」。
  • Input step name:輸入「MapReduce Input」。這是接收mapping數據的步驟名,必須是一個MapReduce Input步驟的名稱。
  • Output step name:輸入「MapReduce Output」。這是mapping輸出步驟名,必須是一個MapReduce Output步驟的名稱。

        Reducer標籤:

  • Transformation:選擇第(2)步創建的Reducer轉換,這裏爲「/root/kettle_hadoop/3/aggregate_mapper.ktr」。
  • Input step name:輸入「MapReduce Input」。這是接收reducing數據的步驟名,必須是一個MapReduce Input步驟的名稱。
  • Output step name:輸入「MapReduce Output」。這是reducing輸出步驟名,必須是一個MapReduce Output步驟的名稱。

        Job Setup標籤:

  • Input path:輸入「/user/root/parse/」。一個以逗號分隔的HDFS目錄列表,目錄中存儲的是MapReduce要處理的源數據文件。
  • Output path:輸入「/user/root/aggregate_mr」。存儲MapReduce做業輸出數據的HDFS目錄。
  • Remove output path before job:勾選。執行做業時先刪除輸出目錄。
  • Input format:輸入「org.apache.hadoop.mapred.TextInputFormat」,爲輸入格式的類名。
  • Output format:輸入「org.apache.hadoop.mapred.TextOutputFormat」,爲輸出格式的類名。

        Cluster標籤:

  • Hadoop job name:輸入「aggregate」。
  • Hadoop cluster:選擇「CDH631」,爲一個已經定義的Hadoop集羣。
  • Number of mapper tasks:1。分配的mapper任務數,由輸入的數據量所決定。典型的值在10-100之間。非CPU密集型的任務能夠指定更高的值。
  • Number of reduce tasks:1。分配的reducer任務數。通常來講,該值設置的越小,reduce操做啓動的越快,設置的越大,reduce操做完成的更快。加大該值會增長Hadoop框架的開銷,但可以使負載更加均衡。若是設置爲0,則不執行reduce操做,mapper的輸出將做爲整個MapReduce做業的輸出。
  • Logging interval:60。日誌消息間隔的秒數。
  • Enable blocking:勾選。若是選中,做業將等待每個做業項完成後再繼續下一個做業項,這是Kettle感知Hadoop做業狀態的惟一方式。若是不選,MapReduce做業會本身執行,而Kettle在提交MapReduce做業後當即會執行下一個做業項。除非選中該項,不然Kettle的錯誤處理在這裏將沒法工做。

        將做業保存爲aggregate_mr.kjb。

(5)執行做業並驗證輸出

[hdfs@node3~]$hdfs dfs -ls /user/root/aggregate_mr/
Found 2 items
-rw-r--r--   3 root supergroup          0 2020-08-31 13:46 /user/root/aggregate_mr/_SUCCESS
-rw-r--r--   3 root supergroup     890709 2020-08-31 13:46 /user/root/aggregate_mr/part-00000
[hdfs@node3~]$hdfs dfs -cat /user/root/aggregate_mr/part-00000 | head -10
0.308.86.81    2012    07    1
0.32.48.676    2012    01    3
0.32.85.668    2012    07    8
0.45.305.7    2012    01    1
0.45.305.7    2012    02    1
0.46.386.626    2011    11    1
0.48.322.75    2012    07    1
0.638.50.46    2011    12    8
0.87.36.333    2012    08    7
01.660.68.623    2012    06    1
cat: Unable to write to output stream.
[hdfs@node3~]$

        能夠看到,/user/root/aggregate_mr/目錄下生成了名爲part-00000輸出文件,文件中包含按IP和年月分組的PV數。

2. 格式化原始web日誌

        本示例說明如何使用Pentaho MapReduce把原始web日誌解析成格式化的記錄。

(1)準備文件與目錄

# 建立原始文件所在目錄
hdfs dfs -mkdir /user/root/raw
# 修改讀寫權限
hdfs dfs -chmod -R 777 /user/root/

        而後用Hadoop copy files做業項將weblogs_rebuild.txt文件放到HDFS的/user/root/raw目錄下,具體操做參見前面「向HDFS導入數據」。

(2)創建一個用於Mapper的轉換

圖3-15 格式化文件Mapper轉換

        編輯「MapReduce Input」步驟以下:

  • Key field:「Type」選擇「String」。
  • Value field:「Type」選擇「String」。

        編輯「正則表達式」步驟以下:

  • 要匹配的字段:輸入「value」。
  • Result field name:輸入「is_match」
  • 爲每一個捕獲組(capture group)建立一個字段:勾選。
  • Replace previous fields:勾選。
  • 正則表達式:
    ^([^\s]{7,15})\s            # client_ip
    -\s                         # unused IDENT field
    -\s                         # unused USER field
    \[((\d{2})/(\w{3})/(\d{4})  # request date dd/MMM/yyyy
    :(\d{2}):(\d{2}):(\d{2})\s([-+ ]\d{4}))\]
                                # request time :HH:mm:ss -0800
    \s"(GET|POST)\s             # HTTP verb
    ([^\s]*)                     # HTTP URI
    \sHTTP/1\.[01]"\s           # HTTP version
     
    (\d{3})\s                   # HTTP status code
    (\d+)\s                     # bytes returned
    "([^"]+)"\s                 # referrer field
     
    "                           # User agent parsing, always quoted.
    "?                          # Sometimes if the user spoofs the user_agent, they incorrectly quote it.
    (                           # The UA string
      [^"]*?                    # Uninteresting bits
      (?:
        (?:
         rv:                    # Beginning of the gecko engine version token
         (?=[^;)]{3,15}[;)])    # ensure version string size
         (                      # Whole gecko version
           (\d{1,2})                   # version_component_major
           \.(\d{1,2}[^.;)]{0,8})      # version_component_minor
           (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_a
           (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_b
         )
         [^"]*                  # More uninteresting bits
        )
       |
        [^"]*                   # More uninteresting bits
      )
    )                           # End of UA string
    "?
    "

     

  •  捕獲組(Capture Group)字段:以下所示,全部字段都是String類型。
    client_ip
    full_request_date
    day
    month
    year
    hour
    minute
    second
    timezone
    http_verb
    uri
    http_status_code
    bytes_returned
    referrer
    user_agent
    firefox_gecko_version
    firefox_gecko_version_major
    firefox_gecko_version_minor
    firefox_gecko_version_a
    firefox_gecko_version_b

     

        編輯「過濾記錄」步驟以下:

  • 發送true數據給步驟:選擇「值映射」。
  • 發送false數據給步驟:選擇「空操做(什麼也不作)」
  • 條件:選擇「is_match = Y」

        編輯「值映射」步驟以下:

  • 使用的字段名:選擇「month」。
  • 目標字段名(空=覆蓋):輸入「month_num」。
  • 不匹配時的默認值:輸入「00」。
  • 字段值:源值與目標值輸入以下。
    Jan 01
    Feb 02
    Mar 03
    Apr 04
    May 05
    Jun 06
    Jul 07
    Aug 08
    Sep 09
    Oct 10
    Nov 11
    Dec 12

     

        編輯「利用Janino計算Java表達式」步驟以下:

  • New field:輸入「output_value」。
  • Java expression:輸入以下。
    client_ip + '\t' + full_request_date + '\t' + day + '\t' + month + '\t' + month_num + '\t' + year + '\t' + hour + '\t' + minute + '\t' + second + '\t' + timezone + '\t' + http_verb + '\t' + uri + '\t' + http_status_code + '\t' + bytes_returned + '\t' + referrer + '\t' + user_agent

     

  • Value type:選擇「String」。

        編輯「MapReduce Output」步驟以下:

  • Key field:選擇「key」。
  • Value field:選擇「output_value」。

        將轉換保存爲weblog_parse_mapper.ktr。

(3)創建一個調用MapReduce步驟的做業

圖3-16 格式化文件Pentaho MapReduce做業

        編輯「Pentaho MapReduce」做業項以下。
        Mapper標籤:

  • Transformation:選擇上一步創建的轉換,這裏爲「/root/kettle_hadoop/3/weblogs_parse_mapper.ktr」。
  • Input step name:輸入「MapReduce Input」。
  • Output step name:輸入「MapReduce Output」。

        Job Setup標籤:

  • Input path:輸入「/user/root/raw」。
  • Output path:輸入「/user/root/parse1」。
  • Remove output path before job:勾選。
  • Input format:輸入「org.apache.hadoop.mapred.TextInputFormat」。
  • Output format:輸入「org.apache.hadoop.mapred.TextOutputFormat」。

        Cluster標籤:

  • Hadoop job name:輸入「Web Log Parse」。
  • Hadoop cluster:選擇「CDH631」。
  • Number of mapper tasks:2
  • Number of reduce tasks:0
  • Logging interval:60
  • Enable blocking:勾選。

        將做業保存爲weblogs_parse_mr.kjb。

(4)執行做業並驗證輸出

        做業成功執行後檢查HDFS的輸出文件,結果以下。

[hdfs@node3~]$hdfs dfs -ls /user/root/parse1
Found 3 items
-rw-r--r--   3 root supergroup          0 2020-08-31 10:59 /user/root/parse1/_SUCCESS
-rw-r--r--   3 root supergroup   42601640 2020-08-31 10:59 /user/root/parse1/part-00000
-rw-r--r--   3 root supergroup   42810160 2020-08-31 10:59 /user/root/parse1/part-00001
[hdfs@node3~]$hdfs dfs -get /user/root/parse1/part-00000
[hdfs@node3~]$head -5 part-00000 
0    323.81.303.680    25/Oct/2011:01:41:00 -0500    25    Oct    10    2011    01    41    00    -0500    GET    /download/download6.zip    200    0    -    Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.19) Gecko/2010031422 Firefox/3.0.19
193    668.667.44.3    25/Oct/2011:07:38:30 -0500    25    Oct    10    2011    07    38    30    -0500    GET    /download/download3.zip    200    0    -    Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.12) Gecko/20070719 CentOS/1.5.0.12-3.el5.centos Firefox/1.5.0.12
405    13.386.648.380    25/Oct/2011:17:06:00 -0500    25    Oct    10    2011    17    06    00    -0500    GET    /download/download6.zip    200    0    -    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB6.3; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; InfoPath.2)
651    06.670.03.40    26/Oct/2011:13:24:00 -0500    26    Oct    10    2011    13    24    00    -0500    GET    /product/demos/product2    200    0    -    Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3
838    18.656.618.46    26/Oct/2011:17:15:30 -0500    26    Oct    10    2011    17    15    30    -0500    GET    /download/download4.zip    200    0    -    Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_3; en-us) AppleWebKit/531.22.7 (KHTML, like Gecko) Version/4.0.5 Safari/531.22.7
[hdfs@node3~]$

        能夠看到,/user/root/parse1目錄下生成了名爲part-00000和part-00001的兩個輸出文件(由於使用了兩個mapper),內容已經被格式化。

6、提交Spark做業

        Kettle不但支持MapReduce做業,還能夠經過「Spark Submit」做業項,向CDH 5.3以上、HDP 2.3以上、Amazon EMR 3.10以上的Hadoop平臺提交Spark做業。在本示例中,咱們先爲Kettle配置Spark,而後修改並執行Kettle安裝包中自帶的Spark PI做業例子,說明如何在Kettle中提交Spark做業。

1. 在Kettle主機上安裝Spark客戶端

        使用Kettle執行Spark做業,須要在Kettle主機安裝Spark客戶端。只要將CDH中Spark的庫文件複製到Kettle所在主機便可。

-- 在172.16.1.127上執行
cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib
scp -r spark 172.16.1.101:/root/

2. 爲Kettle配置Spark

        如下操做均在172.16.1.101以root用戶執行。

(1)備份原始配置文件

cd /root/spark/conf/
cp spark-defaults.conf spark-defaults.conf.bak
cp spark-env.sh spark-env.sh.bak

(2)編輯spark-defaults.conf文件

vim /root/spark/conf/spark-defaults.conf

        內容以下:

# 使用spark.yarn.archive減小任務啓動時間
spark.yarn.archive=hdfs://nameservice1/user/spark/lib/spark_jars.zip
# 解決和yarn相關Jersey包衝突,避免spark on yarn啓動spark-submit時出現java.lang.NoClassDefFoundError錯誤
spark.hadoop.yarn.timeline-service.enabled=false
# 記錄Spark事件,用於應用程序在完成後重構WebUI
spark.eventLog.enabled=true
# 記錄Spark事件的目錄
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
# spark on yarn的history server地址
spark.yarn.historyServer.address=http://node3:18088

(3)編輯spark-env.sh文件

vim /root/spark/conf/spark-env.sh

        內容以下:

#!/usr/bin/env bash

# hadoop配置文件所在目錄 
HADOOP_CONF_DIR=/root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61
# spark主目錄
SPARK_HOME=/root/spark

(4)編輯core-site.xml文件

vim /root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/core-site.xml

        去掉下面這段的註釋:

<property>
  <name>net.topology.script.file.name</name>
  <value>/etc/hadoop/conf.cloudera.yarn/topology.py</value>
</property>

3. 提交Spark做業

(1)修改Kettle自帶的Spark例子

cp /root/pdi-ce-8.3.0.0-371/samples/jobs/Spark\ Submit/Spark\ submit.kjb /root/kettle_hadoop/3/spark_submit.kjb

        在Spoon中打開/root/kettle_hadoop/spark_submit.kjb文件,如圖3-17所示。

圖3-17 Kettle自帶的Spark例子

        編輯Spark PI做業項以下:

  • Spark Submit Utility:選擇Spark提交程序,本例爲「/root/spark/bin/spark-submit」。
  • Master URL:由於yarn運行在CDH集羣,而不是Kettle主機上,因此這裏選擇「yarn-cluster」。
  • Files標籤的Application Jar:選擇「/root/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.1.jar」。

(2)保存行執行做業

        Spark History Server Web UI如圖3-18所示。

圖3-18 Spark UI看到提交的Spark做業

7、小結

        本篇以Kettle 8.3和CDH 6.3.1爲例,介紹Kettle對Hadoop的支持。經過提交適當的參數,Kettle能夠鏈接Hadoop的HDFS、MapReduce、Zookeeper、Oozie和Spark服務。Kettle的數據庫鏈接類型中支持Hive、Hive 2/3和Impala。可使用Kettle導出導入Hadoop集羣中(HDFS、Hive等)的數據,執行Hive的HiveQL語句。Kettle支持在Hadoop中執行基於MapReduce的Kettle轉換,還支持向Spark集羣提交做業。這裏演示的例子都是Pentaho官方提供示例。從下一篇開始,咱們將創建一個模擬的Hadoop數據倉庫,並用使用Kettle完成其上的ETL操做。  

相關文章
相關標籤/搜索