目錄java
2、鏈接Hadoopmysql
(1)開始前準備正則表達式
(2)配置步驟sql
2. 鏈接Hive數據庫
3. 鏈接Impalaexpress
4. 創建MySQL數據庫鏈接apache
本篇演示使用Kettle操做Hadoop上的數據。首先概要介紹Kettle對大數據的支持,而後用示例說明Kettle如何鏈接Hadoop,如何導入導出Hadoop集羣上的數據,如何用Kettle執行Hive的HiveQL語句,還會用一個典型的MapReduce轉換,說明Kettle在實際應用中是怎樣利用Hadoop分佈式計算框架的。本篇最後介紹如何在Kettle中提交Spark做業。
1、Hadoop相關的步驟與做業項
在「ETL與Kettle」(https://wxy0327.blog.csdn.net/article/details/107985148)的小結中曾提到,Kettle具備完備的轉換步驟與做業項,使它可以支持幾乎全部常見數據源。一樣Kettle對大數據也提供了強大的支持,這體如今轉換步驟與做業項的「Big Data」分類中。本例使用的Kettle 8.3版本中所包含的大數據相關步驟有19個,做業項有10個。表3-1和表3-2分別對這些步驟和做業項進行了簡單描述。
步驟名稱 |
描述 |
Avro input |
讀取Avro格式文件 |
Avro output |
寫入Avro格式文件 |
Cassandra input |
從一個Cassandra column family中讀取數據 |
Cassandra output |
向一個Cassandra column family中寫入數據 |
CouchDB input |
獲取CouchDB數據庫一個設計文檔中給定視圖所包含的全部文檔 |
HBase input |
從HBase column family中讀取數據 |
HBase output |
向HBase column family中寫入數據 |
HBase row decoder |
對HBase的鍵/值對進行編碼 |
Hadoop file input |
讀取存儲在Hadoop集羣中的文本型文件 |
Hadoop file output |
向存儲在Hadoop集羣中的文本型文件中寫數據 |
MapReduce input |
向MapReduce輸入鍵值對 |
MapReduce output |
從MapReduce輸出鍵值對 |
MongoDB input |
讀取MongoDB中一個指定數據庫集合的全部記錄 |
MongoDB output |
將數據寫入MongoDB的集合中 |
ORC input |
讀取ORC格式文件 |
ORC output |
寫入ORC格式文件 |
Parquet input |
讀取Parquet格式文件 |
Parquet output |
寫入Parquet格式文件 |
SSTable output |
做爲Cassandra SSTable寫入一個文件系統目錄 |
表3-1 Kettle轉換中的大數據相關步驟
做業項名稱 |
描述 |
Amazon EMR job executor |
在Amazon EMR中執行MapReduce做業 |
Amazon Hive job executor |
在Amazon EMR中執行Hive做業 |
Hadoop copy files |
將本地文件上傳到HDFS,或者在HDFS上覆制文件 |
Hadoop job executor |
在Hadoop節點上執行包含在JAR文件中的MapReduce做業 |
Oozie job executor |
執行Oozie工做流 |
Pentaho MapReduce |
在Hadoop中執行基於MapReduce的轉換 |
Pig script executor |
在Hadoop集羣上執行Pig腳本 |
Spark submit |
提交Spark做業 |
Sqoop export |
使用Sqoop將HDFS上的數據導出到一個關係數據庫中 |
Sqoop import |
使用Sqoop將一個關係數據庫中的數據導入到HDFS上 |
表3-2 Kettle做業中的大數據相關做業項
Kettle的設計很獨特,它既能夠在Hadoop集羣外部執行,也能夠在Hadoop集羣內的節點上執行。在外部執行時,Kettle可以從HDFS、Hive和HBase抽取數據,或者向它們中裝載數據。在Hadoop集羣內部執行時,Kettle轉換能夠做爲Mapper或Reducer任務執行,並容許將Pentaho MapReduce做業項做爲MapReduce的可視化編程工具來使用。後面咱們會用示例演示這些功能。關於Hadoop及其組件的基本概念和功能特性不是本專題所討論的範疇,可參考其它資源。
2、鏈接Hadoop
Kettle能夠與Hadoop協同工做。經過提交適當的參數,Kettle能夠鏈接Hadoop的HDFS、MapReduce、Zookeeper、Oozie、Sqoop和Spark服務。在數據庫鏈接類型中支持Hive和Impala。在本示例中配置Kettle鏈接HDFS、Hive和Impala。爲了給本專題後面實踐中建立的轉換或做業使用,咱們還將定義一個普通的mysql數據庫鏈接對象。
1. 鏈接Hadoop集羣
要使Kettle鏈接Hadoop集羣,須要兩個操做:設置一個Active Shim;創建並測試鏈接。Shim是Pentaho開發的插件,功能有點相似於一個適配器,幫助用戶鏈接Hadoop。Pentaho按期發佈Shim,能夠從sourceforge網站下載與Kettle版本對應的Shim安裝包。使用Shim可以鏈接不一樣的Hadoop發行版本,如CDH、HDP、MapR、Amazon EMR等。當在Kettle中執行一個大數據的轉換或做業時,缺省會使用設置的Active Shim。初始安裝Kettle時,並無Active Shim,所以在嘗試鏈接Hadoop集羣前,首先要作的就是選擇一個Active Shim,選擇的同時也就激活了此Active Shim。設置好Active Shim後,再通過必定的配置,就能夠測試鏈接了。Kettle內建的工具能夠爲完成這些工做提供幫助。
(1)開始前準備
在配置鏈接前,要確認Kettle具備訪問HDFS相關目錄的權限,訪問的目錄一般包括用戶主目錄以及工做須要的其它目錄。Hadoop管理員應該已經配置了容許Kettle所在主機對Hadoop集羣的訪問。除權限外,還須要確認如下信息:
- Hadoop集羣的發行版本。Kettle與Hadoop版本要匹配,本例使用的Kettle 8.3所對應的大數據支持矩陣詳見「https://help.pentaho.com/Documentation/8.3/Setup/Components_Reference」。
- HDFS、MapReduce或Zookeeper服務的IP地址和端口號。
- 若是要使用Oozie,須要知道Oozie服務的URL。
本例中已經安裝好4個節點的CDH 6.3.1集羣,IP地址及主機名以下:
172.16.1.124 manager
172.16.1.125 node1
172.16.1.126 node2
172.16.1.127 node3
啓動的Hadoop服務如圖3-1所示,全部服務都使用缺省端口。關於CDH集羣的安裝與卸載,能夠參見個人博客「基於Hadoop生態圈的數據倉庫實踐 —— 環境搭建(二)」和「一鍵式徹底刪除CDH 6.3.1」。
爲了用主機名訪問Hadoop相關服務,在Kettle主機(172.16.1.101)的/etc/hosts文件中添加了Hadoop集羣四個節點的IP與主機名。
(2)配置步驟
1. 在Kettle中配置Hadoop客戶端文件
在瀏覽器中登陸Cloudera Manager,選擇Hive服務,點擊「操做」->「下載客戶端配置」。在獲得的hive-clientconfig.zip壓縮包中包括了當前Hadoop客戶端的12個配置文件。將其中的core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xml、mapred-site.xml 5個文件複製到Kettle根目錄下的plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/目錄下,覆蓋原來Kettle自帶的這些文件。
2. 選擇Active Shim
在Spoon界面中,選擇主菜單「工具」 -> 「Hadoop Distribution...」,在對話框中選擇「Cloudera CDH 6.1.0」,如圖3-2所示,點擊OK按鈕肯定後重啓Spoon。
3. 在Spoon中建立Hadoop clusters對象
新建一個轉換,在工做區左側的樹的「主對象樹」標籤中,選擇 Hadoop clusters -> 右鍵New Cluster,對話框中輸入如圖3-3所示的屬性值。
上圖的Hadoop集羣配置窗口中的選項及定義說明以下:
- Cluster Name:定義要鏈接的集羣名稱,這裏爲CDH631。
- Hostname(HDFS段):Hadoop集羣中NameNode節點的主機名。因爲本例中的CDH配置了HDFS HA,這裏用HDFS NameNode服務名替代了主機名。
- Port(HDFS段):Hadoop集羣中NameNode節點的端口號,HA不須要填寫。
- Username(HDFS段):HDFS的用戶名,經過宿主操做系統給出,不用填。
- Password(HDFS段):HDFS的密碼,經過宿主操做系統給出,不用填。
- Hostname(JobTracker段):Hadoop集羣中JobTracker節點的主機名。若是有獨立的JobTracker節點,在此輸入,不然使用HDFS的主機名。
- Port(JobTracker段):Hadoop集羣中JobTracker節點的端口號,不能與HDFS的端口號相同。
- Hostname(ZooKeeper段):Hadoop集羣中Zookeeper節點的主機名,只有在鏈接Zookeeper服務時才須要。
- Port(ZooKeeper段):Hadoop集羣中Zookeeper節點的端口號,只有在鏈接Zookeeper服務時才須要。
- URL(Oozie段):Oozie WebUI的地址,只有在鏈接Oozie服務時才須要。
這是本例CDH的配置,你應該按本身的狀況進行相應修改。而後點擊「Test」按鈕,測試結果如圖3-4所示。正常狀況下此時除了一個Kafka鏈接失敗的警告外,其它都應該經過測試。Kafka鏈接失敗,緣由是沒有配置Kafka的Bootstrap servers。咱們在CDH中並無啓動Kafka服務,所以忽略此警告。
關閉「Hadoop Cluster Test」窗口後,點擊「Hadoop cluster」窗口的「肯定」按鈕,至此就創建了一個Kettle能夠鏈接的Hadoop集羣。
若是是首次配置Kettle鏈接Hadoop,不免會出現這樣那樣的問題,Pentaho文檔中列出了配置過程當中的常見問題及其通用解決方法,如表3-3所示。但願這能對Kettle或Hadoop新手有所幫助。
症狀 |
一般緣由 |
通用解決方法 |
Shim和配置問題 |
||
No shim |
|
|
Shim doesn't load |
|
|
The file system's URL does not match the URL in the configuration file |
*-site.xml文件配置錯誤 |
參考Pentaho 「Set Up Pentaho to Connect to an Apache Hadoop Cluster」文檔,檢查配置文件,主要是core-site.xml文件是否配置正確。 |
Sqoop Unsupported major.minor version Error |
在pentaho6.0中,Hadoop集羣上的Java版本比Pentaho使用的Java版本舊。 |
|
鏈接問題 |
||
Hostname does not resolve |
|
|
Port name is incorrect |
|
|
Can't connect |
|
|
目錄訪問或權限問題 |
||
Can't access directory |
|
|
Can't create, read, update, or delete files or directories |
認證或權限問題。 |
|
Test file cannot be overwritten |
Pentaho測試文件已在目錄中。 |
測試已運行,但未刪除測試文件。須要手動刪除測試文件。檢查Kettle根目錄下logs目錄下的spoon.log文件中記錄的測試文件名。測試文件用於驗證用戶能夠在其主目錄中建立、寫入和刪除。 |
表3-3 Kettle鏈接Hadoop時的常見問題
2. 鏈接Hive
Kettle把Hive看成一個數據庫,支持鏈接Hive Server和Hive Server 2/3,數據庫鏈接類型的名字分別爲Hadoop Hive和Hadoop Hive 2/3。這裏演示在Kettle中創建一個Hadoop Hive 2/3類型的數據庫鏈接。
Hive Server有兩個明顯的問題,一是不夠穩定,常常會莫名奇妙假死,致使客戶端全部的鏈接都被掛起。二是併發性支持很差,若是一個用戶在鏈接中設置了一些環境變量,綁定到一個Thrift工做線程,當該用戶斷開鏈接,另外一個用戶建立了一個鏈接,他有可能也被分配到以前的線程,複用以前的配置。這是由於Thrift不支持檢測客戶端是否斷開鏈接,也就沒法清除會話的狀態信息。Hive Server 2的穩定性更高,而且已經完美支持了會話。從長遠來看都會以Hive Server 2做爲首選。
在工做區左側的「主對象樹」標籤中,選擇 「DB鏈接」 -> 右鍵「新建」,對話框中輸入如圖3-5所示的屬性值。
上圖的數據庫鏈接配置窗口中的選項及定義說明以下:
- Connection Name:定義鏈接名稱,這裏爲hive_cdh631。
- Connection Type:鏈接類型選擇Hadoop Hive 2/3。
- Host Name:輸入HiveServer2對應的主機名。在Cloudera Manager中,從Hive服務的「實例」標籤中能夠找到。
- Datebase Name:這裏輸入的rds是Hive裏已經存在的一個數據庫名稱。
- Port Number:端口號輸入hive.server2.thrift.port參數的值。
- User Name:用戶名,這裏爲空。
- Password:密碼,這裏爲空。
點擊「測試」,應該彈出成功鏈接窗口,顯示內容以下:
正確鏈接到數據庫[hive_cdh631] 主機名 : node2 端口 : 10000 數據庫名 :rds
爲了讓其它轉換或做業可以使用此數據庫鏈接對象,須要將它設置爲共享。選擇 「DB鏈接」 -> hive_cdh631 -> 右鍵「共享」,而後保存轉換。
3. 鏈接Impala
Impala是一個運行在Hadoop之上的大規模並行處理(Massively Parallel Processing,MPP)查詢引擎,提供對Hadoop集羣數據的高性能、低延遲的SQL查詢,使用HDFS做爲底層存儲。對查詢的快速響應使交互式查詢和對分析查詢的調優成爲可能,而這些在針對處理長時間批處理做業的SQL-on-Hadoop傳統技術上是難以完成的。Impala是Cloudera公司基於Google Dremel的開源實現。Cloudera公司宣稱除Impala外的其它組件都將移植到Spark框架,並堅信Impala是大數據上SQL解決方案的將來,可見其對Impala的重視程度。
經過將Impala與Hive元數據存儲數據庫相結合,可以在Impala與Hive這兩個組件之間共享數據庫表。而且Impala與HiveQL的語法兼容,所以既可使用Impala也可使用Hive進行創建表、發佈查詢、裝載數據等操做。Impala能夠在已經存在的Hive表上執行交互式實時查詢。
建立Impala鏈接的過程與Hive相似。在工做區左側的「主對象樹」標籤中,選擇「DB鏈接」 -> 右鍵「新建」,對話框中輸入如圖3-6所示的屬性值。
上圖的數據庫鏈接配置窗口中的選項及定義說明以下:
- Connection Name:定義鏈接名稱,這裏爲impala_cdh631。
- Connection Type:鏈接類型選擇Impala。
- Host Name:輸入任一Impala Daemon對應的主機名。在Cloudera Manager中,從Impala服務的「實例」標籤中能夠找到。
- Datebase Name:這裏輸入的rds是Hive裏已經存在的一個數據庫名稱。
- Port Number:端口號輸入Impala Daemon HiveServer2端口參數的值。
- User Name:用戶名,這裏爲空。
- Password:密碼,這裏爲空。
點擊「測試」,應該彈出成功鏈接窗口,顯示內容以下:
正確鏈接到數據庫[impala_cdh631] 主機名 : node3 端口 : 21050 數據庫名 :rds
同hive_cdh631同樣,將impala_cdh631數據庫鏈接共享,而後保存轉換。
4. 創建MySQL數據庫鏈接
Kettle中建立數據庫鏈接的方法都相似,區別只是在「鏈接類型」中選擇不一樣的數據庫,而後輸入相關的屬性,「鏈接方式」一般選擇Native(JDBC)。例如MySQL鏈接配置如圖3-7所示。
這裏的鏈接名稱爲mysql_node3。配置MySQL數據庫鏈接須要注意的一點是,須要事先將對應版本的MySQL JDBC驅動程序拷貝到Kettle根目錄的lib目錄下,不然在測試鏈接時可能出現以下錯誤:
org.pentaho.di.core.exception.KettleDatabaseException: Error occurred while trying to connect to the database Driver class 'org.gjt.mm.mysql.Driver' could not be found, make sure the 'MySQL' driver (jar file) is installed. org.gjt.mm.mysql.Driver
本例中鏈接的MySQL服務器版本爲5.6.14,所以使用下面的命令拷貝JDBC驅動,而後重啓Spoon以從新加載全部驅動。
cp mysql-connector-java-5.1.38-bin.jar /root/pdi-ce-8.3.0.0-371/lib/
至此成功建立了一個Hadoop集羣對象CDH631,,以及三個數據庫鏈接對象hive_cdh63一、impala_cdh631和mysql_node3。
3、導入導出Hadoop集羣數據
本節用四個示例演示如何使用Kettle導出導入Hadoop數據。這四個示例是:向HDFS導入數據;向Hive導入數據;從HDFS抽取數據到MySQL;從Hive抽取數據到MySQL。
1. 向HDFS導入數據
用Kettle將本地文件導入HDFS很是簡單,只須要一個「Hadoop copy files」做業項就能夠實現。它執行的效果同 hdfs dfs -put 命令是相同的。從下面的地址下載Pentaho提供的web日誌示例文件,將解壓縮後的weblogs_rebuild.txt文件放到Kettle所在主機的本地目錄下。
在Spoon中新建一個只包含「Start」和「Hadoop copy files」兩個做業項的做業,如圖3-8所示。
雙擊「Hadoop Copy Files」做業項,編輯屬性以下:
- Source Environment:選擇「Local」。
- 源文件/目錄:選擇本地文件,本例爲「file:///root/kettle_hadoop/3/weblogs_rebuild.txt」
- 通配符:空。
- Destination Environment:選擇「CDH631」,這是咱們以前已經創建好的Hadoop Clusters對象。
- Destination File/Folder:選擇HDFS上的目錄,本例爲/user/root。
保存併成功執行做業後,查看HDFS目錄,結果以下。能夠看到,weblogs_rebuild.txt文件已從本地導入HDFS的/user/root目錄中。每次執行做業會覆蓋HDFS中已存在的同名文件。
[hdfs@manager~]$hdfs dfs -ls /user/root Found 1 items -rw-r--r-- 3 root supergroup 77908174 2020-08-28 08:53 /user/root/weblogs_rebuild.txt [hdfs@manager~]$
2. 向Hive導入數據
Hive缺省是不能進行行級插入的,也就是說缺省時不能使用insert into ... values這種SQL語句向Hive插入數據。一般Hive表數據導入方式有如下兩種:
- 從本地文件系統中導入數據到Hive表,使用的語句是:
load data local inpath 目錄或文件 into table 表名;
- 從HDFS上導入數據到Hive表,使用的語句是:
load data inpath 目錄或文件 into table 表名;
再有數據一旦導入Hive表,缺省是不能進行更新和刪除的,只能向表中追加數據或者用新數據總體覆蓋原來的數據。要刪除表數據只能執行truncate或者drop table操做,這其實是刪除了表所對應的HDFS上的數據文件或目錄。
Kettle做業中的「Hadoop Copy Files」做業項能夠將本地文件上傳至HDFS,所以只要將前面的做業稍加修改,將Destination File/Folder選擇爲hive表所在的HDFS目錄便可,做業執行的效果與load data local inpath語句相同。
首先從下面的地址下載Pentaho提供的格式化後的web日誌示例文件,將解壓縮後的weblogs_parse.txt文件放到Kettle所在主機的本地目錄下。
而後執行下面的HiveQL創建一個Hive表,表結構與weblogs_parse.txt文件的結構相匹配。
create table test.weblogs ( client_ip string, full_request_date string, day string, month string, month_num int, year string, hour string, minute string, second string, timezone string, http_verb string, uri string, http_status_code string, bytes_returned string, referrer string, user_agent string) row format delimited fields terminated by '\t';
建立和前例相同的做業,只是修改如下兩個做業項屬性:
- 源文件/目錄:file:///root/kettle_hadoop/3/weblogs_parse.txt
- Destination File/Folder:/user/hive/warehouse/test.db/weblogs
保存併成功執行做業後,查詢test.weblogs表的記錄與weblogs_parse.txt文件內容相同。
3. 從HDFS抽取數據到MySQL
這是Pentaho提供的一個壓縮文件,其中包含一個名爲weblogs_aggregate.txt的文本文件,文件中有36616行記錄,每行記錄有4列,分別表示IP地址、年份、月份、訪問頁面數,前5行記錄以下。咱們使用這個文件做爲最初的原始數據。
0.308.86.81 2012 07 1 0.32.48.676 2012 01 3 0.32.85.668 2012 07 8 0.45.305.7 2012 01 1 0.45.305.7 2012 02 1
用下面的命令把解壓縮後的weblogs_aggregate.txt文件上傳到HDFS的/user/root目錄下。
hdfs dfs -put weblogs_aggregate.txt /user/root/
在Spoon中新建一個如圖3-9的轉換。轉換中只包含「Hadoop File Input」和「表輸出」 兩個步驟。
編輯「Hadoop File Input」步驟屬性以下:
(1)「文件」標籤
- Environment:選擇「CDH631」。
- File/Folder:選擇「/user/root/weblogs_aggregate.txt」
(2)「內容」標籤
- 文件類型:CVS
- 分隔符:刪除分號,點擊「Insert TAB」按鈕插入TAB分隔符。
- 頭部:勾掉。
- 格式:選擇「Unix」。
- 本地日期格式:選擇「en_US」
(3)「字段」標籤
輸入如表3-4所示。
名稱 |
類型 |
格式 |
長度 |
去除空字符串方式 |
重複 |
client_ip |
String |
|
20 |
不去掉空格 |
否 |
year |
Integer |
# |
15 |
不去掉空格 |
否 |
month_num |
Integer |
# |
15 |
不去掉空格 |
否 |
pageviews |
Integer |
# |
15 |
不去掉空格 |
否 |
表3-4 weblogs_aggregate.txt對應的字段
編輯「表輸出」步驟屬性以下:
- 數據庫鏈接:選擇「mysql_node3」。
- 目標表:輸入「aggregate_hdfs」。
- 剪裁表:勾選。
mysql_node3是鏈接Hadoop時已經建好的一個MySQL數據庫鏈接。「主選項」和「數據庫字段」標籤下的屬性都不須要設置,「表字段」和「流字段」會自動映射。
下面執行SQL創建mysql的表:
use test; create table aggregate_hdfs ( client_ip varchar(15), year smallint, month_num tinyint, pageviews bigint );
保存並執行轉換,而後查詢aggregate_hdfs表,結果以下:
mysql> select count(*) from test.aggregate_hdfs; +----------+ | count(*) | +----------+ | 36616 | +----------+ 1 row in set (0.03 sec) mysql> select * from test.aggregate_hdfs limit 5; +-------------+------+-----------+-----------+ | client_ip | year | month_num | pageviews | +-------------+------+-----------+-----------+ | 0.308.86.81 | 2012 | 7 | 1 | | 0.32.48.676 | 2012 | 1 | 3 | | 0.32.85.668 | 2012 | 7 | 8 | | 0.45.305.7 | 2012 | 1 | 1 | | 0.45.305.7 | 2012 | 2 | 1 | +-------------+------+-----------+-----------+ 5 rows in set (0.00 sec)
4. 從Hive抽取數據到MySQL
在Spoon中新建一個如圖3-10的轉換。轉換中只包含「表輸入」和「表輸出」 兩個步驟。
編輯「表輸入」步驟屬性以下:
- 數據庫鏈接:選擇「hive_cdh631」。
- SQL:輸入下面的SQL語句:
select client_ip, year, month, month_num, count(*) as pageviews from test.weblogs group by client_ip, year, month, month_num
hive_cdh631是鏈接Hadoop時已經建好的一個Hive數據庫鏈接。
編輯「表輸出」步驟屬性以下:
- 數據庫鏈接:選擇「mysql_node3」。
- 目標表:輸入「aggregate_hive」。
- 剪裁表:勾選。
下面執行SQL創建mysql的表:
use test; create table aggregate_hive ( client_ip varchar(15), year varchar(4), month varchar(10), month_num tinyint, pageviews bigint );
保存並執行轉換,而後查詢aggregate_hive表,結果以下:
mysql> select count(*) from test.aggregate_hive; +----------+ | count(*) | +----------+ | 36616 | +----------+ 1 row in set (0.03 sec) mysql> select * from test.aggregate_hive limit 5; +---------------+------+-------+-----------+-----------+ | client_ip | year | month | month_num | pageviews | +---------------+------+-------+-----------+-----------+ | 0.45.305.7 | 2012 | Feb | 2 | 1 | | 0.48.322.75 | 2012 | Jul | 7 | 1 | | 0.638.50.46 | 2011 | Dec | 12 | 8 | | 01.660.68.623 | 2012 | Jun | 6 | 1 | | 01.660.70.74 | 2012 | Jul | 7 | 1 | +---------------+------+-------+-----------+-----------+ 5 rows in set (0.00 sec)
4、執行HiveQL語句
在這個示例中演示如何用Kettle執行Hive的HiveQL語句。咱們在「向Hive導入數據」一節創建的weblogs表上執行聚合查詢,同時創建一個新表保存查詢結果。新建一個Kettle做業,只有「START」和「SQL」兩個做業項,如圖3-11所示。
編輯「SQL」做業項屬性以下:
- 數據庫鏈接:選擇「hive_cdh631」。
- SQL腳本:
create table test.weblogs_agg as select client_ip, year, month, month_num, count(*) from test.weblogs group by client_ip, year, month, month_num;
保存併成功執行做業後檢查hive表,結果以下:
hive> select count(*) from test.weblogs_agg; ... 36616
能夠看到weblogs_agg表中已經保存了所有的聚合數據。
5、執行MapReduce
1. 生成聚合數據集
「執行HiveQL語句」示例只用一句HiveQL就生成了聚合數據,本示例使用「Pentaho MapReduce」做業項完成類似的功能,把細節數據彙總成聚合數據集。當給一個關係型數據倉庫或數據集市準備待抽取的數據時,這是一個常見的使用場景。咱們把weblogs_parse.txt文件做爲細節數據,目標是生成聚合數據文件,其中包含按IP和年月分組統計的PV數。
(1)準備文件與目錄
# 建立格式化文件所在目錄 hdfs dfs -mkdir /user/root/parse/ # 上傳格式化文件 hdfs dfs -put -f weblogs_parse.txt /user/root/parse/ # 修改讀寫權限 hdfs dfs -chmod -R 777 /user/root/
(2)創建一個用於Mapper的轉換
如圖3-12所示的轉換由「MapReduce Input」、「拆分字段」、「利用Janino計算Java表達式」、「MapReduce Output」四個步驟組成。
編輯「MapReduce Input」步驟以下:
- Key field:「Type」選擇「String」,定義 Hadoop MapReduce 鍵的數據類型。
- Value field:「Type」選擇「String」,定義 Hadoop MapReduce 值的數據類型。
該步驟輸出兩個字段,名稱是固定的key和value,也就是Map階段輸入的鍵值對。
編輯「拆分字段」步驟以下:
- 須要拆分的字段:選擇「value」。
- 分隔符:輸入「$[09]」,以TAB做爲分隔符。
- 字段:新的字段名以下,類型均爲String。
client_ip full_request_date day month month_num year hour minute second timezone http_verb uri http_status_code bytes_returned referrer user_agent
該步驟將輸入的value字段拆分紅16個字段,輸出17個字段(key字段沒變,文本文件每行的key是文件起始位置到每行的字節偏移量)。
編輯「利用Janino計算Java表達式」步驟如表3-5所示。
New field |
Java expression |
Value type |
new_key |
client_ip + '\t' + year + '\t' + month_num |
String |
new_value |
1 |
Integer |
表3-5 聚合數據轉換中的「利用Janino計算Java表達式」步驟
該步驟爲數據流中增長兩個新的字段,名稱分別定義爲new_key和new_value。new_key字段的值定義爲client_ip + '\t' + year + '\t' + month_num,將IP地址、年份、月份和字段間的兩個TAB符拼接成一個字符串。new_value字段的值爲1,數據類型是整數。該步驟輸出19個字段。
編輯「MapReduce Output」步驟以下:
- Key field:選擇「new_key」。
- Value field:選擇「new_value」。
該步驟輸出「new_key」和「new_value」兩個字段,即Map階段輸出的鍵值對。
將轉換保存爲aggregate_mapper.ktr。
(3)創建一個用於Reducer的轉換
圖3-13 生成聚合數據Reducer轉換
如圖3-13所示的轉換由「MapReduce Input」、「分組」、「MapReduce Output」三個步驟組成。
編輯「MapReduce Input」步驟以下:
. Key field:「Type」選擇「String」。
. Value field:「Type」選擇「Integer」。
該步驟輸出兩個字段,名稱是固定的key和value,key對應Mapper轉換的new_key輸出字段,value對應Mapper轉換的new_value輸出字段。
編輯「分組」步驟以下:
- 構成分組的字段:選擇「key」。
- 聚合:名稱、Subject、類型三列的值分別是new_value、value、求和。
該步驟按key字段分組(key字段的值就是client_ip + '\t' + year + '\t' + month_num),對每一個分組的value求和,每組的合計值定義爲一個新的字段new_value。注意,此處的new_value和Mapper轉換輸出的new_value字段含義是不一樣的。Mapper轉換輸出的new_value字段對應這裏的Subject字段值。
編輯「MapReduce Output」步驟以下:
- Key field:選擇「key」。
- Value field:選擇「new_value」。
輸出Reducer處理後的鍵值對,這就是咱們想要的結果。
將轉換保存爲aggregate_reducer.ktr。
(4)創建一個調用MapReduce步驟的做業
如圖3-14所示的做業使用mapper和reducer轉換。須要編輯Pentaho MapReduce做業項的Mapper、Reducer、job Setup、Cluster四個標籤頁,每一個標籤頁上的選項及定義。
Mapper標籤:
- Transformation:選擇第(1)步創建的Mapper轉換,這裏爲「/root/kettle_hadoop/3/aggregate_mapper.ktr」。
- Input step name:輸入「MapReduce Input」。這是接收mapping數據的步驟名,必須是一個MapReduce Input步驟的名稱。
- Output step name:輸入「MapReduce Output」。這是mapping輸出步驟名,必須是一個MapReduce Output步驟的名稱。
Reducer標籤:
- Transformation:選擇第(2)步創建的Reducer轉換,這裏爲「/root/kettle_hadoop/3/aggregate_mapper.ktr」。
- Input step name:輸入「MapReduce Input」。這是接收reducing數據的步驟名,必須是一個MapReduce Input步驟的名稱。
- Output step name:輸入「MapReduce Output」。這是reducing輸出步驟名,必須是一個MapReduce Output步驟的名稱。
Job Setup標籤:
- Input path:輸入「/user/root/parse/」。一個以逗號分隔的HDFS目錄列表,目錄中存儲的是MapReduce要處理的源數據文件。
- Output path:輸入「/user/root/aggregate_mr」。存儲MapReduce做業輸出數據的HDFS目錄。
- Remove output path before job:勾選。執行做業時先刪除輸出目錄。
- Input format:輸入「org.apache.hadoop.mapred.TextInputFormat」,爲輸入格式的類名。
- Output format:輸入「org.apache.hadoop.mapred.TextOutputFormat」,爲輸出格式的類名。
Cluster標籤:
- Hadoop job name:輸入「aggregate」。
- Hadoop cluster:選擇「CDH631」,爲一個已經定義的Hadoop集羣。
- Number of mapper tasks:1。分配的mapper任務數,由輸入的數據量所決定。典型的值在10-100之間。非CPU密集型的任務能夠指定更高的值。
- Number of reduce tasks:1。分配的reducer任務數。通常來講,該值設置的越小,reduce操做啓動的越快,設置的越大,reduce操做完成的更快。加大該值會增長Hadoop框架的開銷,但可以使負載更加均衡。若是設置爲0,則不執行reduce操做,mapper的輸出將做爲整個MapReduce做業的輸出。
- Logging interval:60。日誌消息間隔的秒數。
- Enable blocking:勾選。若是選中,做業將等待每個做業項完成後再繼續下一個做業項,這是Kettle感知Hadoop做業狀態的惟一方式。若是不選,MapReduce做業會本身執行,而Kettle在提交MapReduce做業後當即會執行下一個做業項。除非選中該項,不然Kettle的錯誤處理在這裏將沒法工做。
將做業保存爲aggregate_mr.kjb。
(5)執行做業並驗證輸出
[hdfs@node3~]$hdfs dfs -ls /user/root/aggregate_mr/ Found 2 items -rw-r--r-- 3 root supergroup 0 2020-08-31 13:46 /user/root/aggregate_mr/_SUCCESS -rw-r--r-- 3 root supergroup 890709 2020-08-31 13:46 /user/root/aggregate_mr/part-00000 [hdfs@node3~]$hdfs dfs -cat /user/root/aggregate_mr/part-00000 | head -10 0.308.86.81 2012 07 1 0.32.48.676 2012 01 3 0.32.85.668 2012 07 8 0.45.305.7 2012 01 1 0.45.305.7 2012 02 1 0.46.386.626 2011 11 1 0.48.322.75 2012 07 1 0.638.50.46 2011 12 8 0.87.36.333 2012 08 7 01.660.68.623 2012 06 1 cat: Unable to write to output stream. [hdfs@node3~]$
能夠看到,/user/root/aggregate_mr/目錄下生成了名爲part-00000輸出文件,文件中包含按IP和年月分組的PV數。
2. 格式化原始web日誌
本示例說明如何使用Pentaho MapReduce把原始web日誌解析成格式化的記錄。
(1)準備文件與目錄
# 建立原始文件所在目錄 hdfs dfs -mkdir /user/root/raw # 修改讀寫權限 hdfs dfs -chmod -R 777 /user/root/
而後用Hadoop copy files做業項將weblogs_rebuild.txt文件放到HDFS的/user/root/raw目錄下,具體操做參見前面「向HDFS導入數據」。
(2)創建一個用於Mapper的轉換
編輯「MapReduce Input」步驟以下:
- Key field:「Type」選擇「String」。
- Value field:「Type」選擇「String」。
編輯「正則表達式」步驟以下:
- 要匹配的字段:輸入「value」。
- Result field name:輸入「is_match」
- 爲每一個捕獲組(capture group)建立一個字段:勾選。
- Replace previous fields:勾選。
- 正則表達式:
^([^\s]{7,15})\s # client_ip -\s # unused IDENT field -\s # unused USER field \[((\d{2})/(\w{3})/(\d{4}) # request date dd/MMM/yyyy :(\d{2}):(\d{2}):(\d{2})\s([-+ ]\d{4}))\] # request time :HH:mm:ss -0800 \s"(GET|POST)\s # HTTP verb ([^\s]*) # HTTP URI \sHTTP/1\.[01]"\s # HTTP version (\d{3})\s # HTTP status code (\d+)\s # bytes returned "([^"]+)"\s # referrer field " # User agent parsing, always quoted. "? # Sometimes if the user spoofs the user_agent, they incorrectly quote it. ( # The UA string [^"]*? # Uninteresting bits (?: (?: rv: # Beginning of the gecko engine version token (?=[^;)]{3,15}[;)]) # ensure version string size ( # Whole gecko version (\d{1,2}) # version_component_major \.(\d{1,2}[^.;)]{0,8}) # version_component_minor (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_a (?:\.(\d{1,2}[^.;)]{0,8}))? # version_component_b ) [^"]* # More uninteresting bits ) | [^"]* # More uninteresting bits ) ) # End of UA string "? "
- 捕獲組(Capture Group)字段:以下所示,全部字段都是String類型。
client_ip full_request_date day month year hour minute second timezone http_verb uri http_status_code bytes_returned referrer user_agent firefox_gecko_version firefox_gecko_version_major firefox_gecko_version_minor firefox_gecko_version_a firefox_gecko_version_b
編輯「過濾記錄」步驟以下:
- 發送true數據給步驟:選擇「值映射」。
- 發送false數據給步驟:選擇「空操做(什麼也不作)」
- 條件:選擇「is_match = Y」
編輯「值映射」步驟以下:
- 使用的字段名:選擇「month」。
- 目標字段名(空=覆蓋):輸入「month_num」。
- 不匹配時的默認值:輸入「00」。
- 字段值:源值與目標值輸入以下。
Jan 01 Feb 02 Mar 03 Apr 04 May 05 Jun 06 Jul 07 Aug 08 Sep 09 Oct 10 Nov 11 Dec 12
編輯「利用Janino計算Java表達式」步驟以下:
- New field:輸入「output_value」。
- Java expression:輸入以下。
client_ip + '\t' + full_request_date + '\t' + day + '\t' + month + '\t' + month_num + '\t' + year + '\t' + hour + '\t' + minute + '\t' + second + '\t' + timezone + '\t' + http_verb + '\t' + uri + '\t' + http_status_code + '\t' + bytes_returned + '\t' + referrer + '\t' + user_agent
- Value type:選擇「String」。
編輯「MapReduce Output」步驟以下:
- Key field:選擇「key」。
- Value field:選擇「output_value」。
將轉換保存爲weblog_parse_mapper.ktr。
(3)創建一個調用MapReduce步驟的做業
編輯「Pentaho MapReduce」做業項以下。
Mapper標籤:
- Transformation:選擇上一步創建的轉換,這裏爲「/root/kettle_hadoop/3/weblogs_parse_mapper.ktr」。
- Input step name:輸入「MapReduce Input」。
- Output step name:輸入「MapReduce Output」。
Job Setup標籤:
- Input path:輸入「/user/root/raw」。
- Output path:輸入「/user/root/parse1」。
- Remove output path before job:勾選。
- Input format:輸入「org.apache.hadoop.mapred.TextInputFormat」。
- Output format:輸入「org.apache.hadoop.mapred.TextOutputFormat」。
Cluster標籤:
- Hadoop job name:輸入「Web Log Parse」。
- Hadoop cluster:選擇「CDH631」。
- Number of mapper tasks:2
- Number of reduce tasks:0
- Logging interval:60
- Enable blocking:勾選。
將做業保存爲weblogs_parse_mr.kjb。
(4)執行做業並驗證輸出
做業成功執行後檢查HDFS的輸出文件,結果以下。
[hdfs@node3~]$hdfs dfs -ls /user/root/parse1 Found 3 items -rw-r--r-- 3 root supergroup 0 2020-08-31 10:59 /user/root/parse1/_SUCCESS -rw-r--r-- 3 root supergroup 42601640 2020-08-31 10:59 /user/root/parse1/part-00000 -rw-r--r-- 3 root supergroup 42810160 2020-08-31 10:59 /user/root/parse1/part-00001 [hdfs@node3~]$hdfs dfs -get /user/root/parse1/part-00000 [hdfs@node3~]$head -5 part-00000 0 323.81.303.680 25/Oct/2011:01:41:00 -0500 25 Oct 10 2011 01 41 00 -0500 GET /download/download6.zip 200 0 - Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.0.19) Gecko/2010031422 Firefox/3.0.19 193 668.667.44.3 25/Oct/2011:07:38:30 -0500 25 Oct 10 2011 07 38 30 -0500 GET /download/download3.zip 200 0 - Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.8.0.12) Gecko/20070719 CentOS/1.5.0.12-3.el5.centos Firefox/1.5.0.12 405 13.386.648.380 25/Oct/2011:17:06:00 -0500 25 Oct 10 2011 17 06 00 -0500 GET /download/download6.zip 200 0 - Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; GTB6.3; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; InfoPath.2) 651 06.670.03.40 26/Oct/2011:13:24:00 -0500 26 Oct 10 2011 13 24 00 -0500 GET /product/demos/product2 200 0 - Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3 838 18.656.618.46 26/Oct/2011:17:15:30 -0500 26 Oct 10 2011 17 15 30 -0500 GET /download/download4.zip 200 0 - Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_3; en-us) AppleWebKit/531.22.7 (KHTML, like Gecko) Version/4.0.5 Safari/531.22.7 [hdfs@node3~]$
能夠看到,/user/root/parse1目錄下生成了名爲part-00000和part-00001的兩個輸出文件(由於使用了兩個mapper),內容已經被格式化。
6、提交Spark做業
Kettle不但支持MapReduce做業,還能夠經過「Spark Submit」做業項,向CDH 5.3以上、HDP 2.3以上、Amazon EMR 3.10以上的Hadoop平臺提交Spark做業。在本示例中,咱們先爲Kettle配置Spark,而後修改並執行Kettle安裝包中自帶的Spark PI做業例子,說明如何在Kettle中提交Spark做業。
1. 在Kettle主機上安裝Spark客戶端
使用Kettle執行Spark做業,須要在Kettle主機安裝Spark客戶端。只要將CDH中Spark的庫文件複製到Kettle所在主機便可。
-- 在172.16.1.127上執行 cd /opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib scp -r spark 172.16.1.101:/root/
2. 爲Kettle配置Spark
如下操做均在172.16.1.101以root用戶執行。
(1)備份原始配置文件
cd /root/spark/conf/ cp spark-defaults.conf spark-defaults.conf.bak cp spark-env.sh spark-env.sh.bak
(2)編輯spark-defaults.conf文件
vim /root/spark/conf/spark-defaults.conf
內容以下:
# 使用spark.yarn.archive減小任務啓動時間 spark.yarn.archive=hdfs://nameservice1/user/spark/lib/spark_jars.zip # 解決和yarn相關Jersey包衝突,避免spark on yarn啓動spark-submit時出現java.lang.NoClassDefFoundError錯誤 spark.hadoop.yarn.timeline-service.enabled=false # 記錄Spark事件,用於應用程序在完成後重構WebUI spark.eventLog.enabled=true # 記錄Spark事件的目錄 spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory # spark on yarn的history server地址 spark.yarn.historyServer.address=http://node3:18088
(3)編輯spark-env.sh文件
vim /root/spark/conf/spark-env.sh
內容以下:
#!/usr/bin/env bash # hadoop配置文件所在目錄 HADOOP_CONF_DIR=/root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61 # spark主目錄 SPARK_HOME=/root/spark
(4)編輯core-site.xml文件
vim /root/pdi-ce-8.3.0.0-371/plugins/pentaho-big-data-plugin/hadoop-configurations/cdh61/core-site.xml
去掉下面這段的註釋:
<property> <name>net.topology.script.file.name</name> <value>/etc/hadoop/conf.cloudera.yarn/topology.py</value> </property>
3. 提交Spark做業
(1)修改Kettle自帶的Spark例子
cp /root/pdi-ce-8.3.0.0-371/samples/jobs/Spark\ Submit/Spark\ submit.kjb /root/kettle_hadoop/3/spark_submit.kjb
在Spoon中打開/root/kettle_hadoop/spark_submit.kjb文件,如圖3-17所示。
編輯Spark PI做業項以下:
- Spark Submit Utility:選擇Spark提交程序,本例爲「/root/spark/bin/spark-submit」。
- Master URL:由於yarn運行在CDH集羣,而不是Kettle主機上,因此這裏選擇「yarn-cluster」。
- Files標籤的Application Jar:選擇「/root/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.1.jar」。
(2)保存行執行做業
Spark History Server Web UI如圖3-18所示。
7、小結
本篇以Kettle 8.3和CDH 6.3.1爲例,介紹Kettle對Hadoop的支持。經過提交適當的參數,Kettle能夠鏈接Hadoop的HDFS、MapReduce、Zookeeper、Oozie和Spark服務。Kettle的數據庫鏈接類型中支持Hive、Hive 2/3和Impala。可使用Kettle導出導入Hadoop集羣中(HDFS、Hive等)的數據,執行Hive的HiveQL語句。Kettle支持在Hadoop中執行基於MapReduce的Kettle轉換,還支持向Spark集羣提交做業。這裏演示的例子都是Pentaho官方提供示例。從下一篇開始,咱們將創建一個模擬的Hadoop數據倉庫,並用使用Kettle完成其上的ETL操做。