離線輔助系統php |
數據接入html |
Flume介紹java |
Flume組件node |
||
Flume實戰案例mysql |
||
任務調度linux |
調度器基礎git |
|
市面上調度工具github |
||
Oozie的使用web |
||
Oozie的流程定義詳解ajax |
||
數據導出 |
sqoop基礎知識 |
|
sqoop實戰及原理 |
||
Sqoop數據導入實戰 |
||
Sqoop數據導出實戰 |
||
Sqoop做業操做 |
||
Sqoop的原理 |
目標:
一、理解flume、sqoop、oozie的應用場景
二、理解flume、sqoop、oozie的基本原理
三、掌握flume、sqoop、oozie的使用方法
在一個完整的大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心以外,還須要數據採集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:
u Flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
u Flume能夠採集文件,socket數據包等各類形式源數據,又能夠將採集到的數據輸出到HDFS、hbase、hive、kafka等衆多外部存儲系統中
u 通常的採集需求,經過對flume的簡單配置便可實現
u Flume針對特殊場景也具有良好的自定義擴展能力,所以,flume能夠適用於大部分的平常數據採集場景
一、 Flume分佈式系統中最核心的角色是agent,flume採集系統就是由一個個agent所鏈接起來造成
二、 每個agent至關於一個數據傳遞員,內部有三個組件:
a) Source:採集源,用於跟數據源對接,以獲取數據
b) Sink:下沉地,採集數據的傳送目的,用於往下一級agent傳遞數據或者往最終存儲系統傳遞數據
c) Channel:agent內部的數據傳輸通道,用於從source將數據傳遞到sink
單個agent採集數據
多級agent之間串聯
一、Flume的安裝很是簡單,只須要解壓便可,固然,前提是已有hadoop環境
上傳安裝包到數據源所在節點上
而後解壓 tar -zxvf apache-flume-1.6.0-bin.tar.gz
而後進入flume的目錄,修改conf下的flume-env.sh,在裏面配置JAVA_HOME
二、根據數據採集的需求配置採集方案,描述在配置文件中(文件名可任意自定義)
三、指定採集方案配置文件,在相應的節點上啓動flume agent
先用一個最簡單的例子來測試一下程序環境是否正常
一、先在flume的conf目錄下新建一個文件
vi netcat-logger.conf
# 定義這個agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 描述和配置source組件:r1 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
# 描述和配置sink組件:k1 a1.sinks.k1.type = logger
# 描述和配置channel組件,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 描述和配置source channel sink之間的鏈接關係 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
二、啓動agent去採集數據
bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1 -Dflume.root.logger=INFO,console |
-c conf 指定flume自身的配置文件所在目錄
-f conf/netcat-logger.con 指定咱們所描述的採集方案
-n a1 指定咱們這個agent的名字
三、測試
先要往agent採集監聽的端口上發送數據,讓agent有數據可採
隨便在一個能跟agent節點聯網的機器上
telnet anget-hostname port (telnet localhost 44444)
採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去
根據需求,首先定義如下3大要素
l 採集源,即source——監控文件目錄 : spooldir
l 下沉目標,即sink——HDFS文件系統 : hdfs sink
l source和sink之間的傳遞通道——channel,可用file channel 也能夠用內存channel
配置文件編寫:
#定義三大組件的名稱 agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# 配置source組件 agent1.sources.source1.type = spooldir agent1.sources.source1.spoolDir = /home/hadoop/logs/ agent1.sources.source1.fileHeader = false
#配置攔截器 agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 配置sink組件 agent1.sinks.sink1.type = hdfs agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 #agent1.sinks.sink1.hdfs.round = true #agent1.sinks.sink1.hdfs.roundValue = 10 #agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
Channel參數解釋:
capacity:默認該通道中最大的能夠存儲的event數量
trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量
keep-alive:event添加到通道中或者移出的容許時間
採集需求:好比業務系統使用log4j生成的日誌,日誌內容不斷增長,須要把追加到日誌文件中的數據實時採集到hdfs
根據需求,首先定義如下3大要素
l 採集源,即source——監控文件內容更新 : exec ‘tail -F file’
l 下沉目標,即sink——HDFS文件系統 : hdfs sink
l Source和sink之間的傳遞通道——channel,可用file channel 也能夠用 內存channel
配置文件編寫:
agent1.sources = source1 agent1.sinks = sink1 agent1.channels = channel1
# Describe/configure tail -F source1 agent1.sources.source1.type = exec agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log agent1.sources.source1.channels = channel1
#configure host for source agent1.sources.source1.interceptors = i1 agent1.sources.source1.interceptors.i1.type = host agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1 agent1.sinks.sink1.type = hdfs #a1.sinks.k1.channel = c1 agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M agent1.sinks.sink1.hdfs.filePrefix = access_log agent1.sinks.sink1.hdfs.maxOpenFiles = 5000 agent1.sinks.sink1.hdfs.batchSize= 100 agent1.sinks.sink1.hdfs.fileType = DataStream agent1.sinks.sink1.hdfs.writeFormat =Text agent1.sinks.sink1.hdfs.rollSize = 102400 agent1.sinks.sink1.hdfs.rollCount = 1000000 agent1.sinks.sink1.hdfs.rollInterval = 60 agent1.sinks.sink1.hdfs.round = true agent1.sinks.sink1.hdfs.roundValue = 10 agent1.sinks.sink1.hdfs.roundUnit = minute agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory agent1.channels.channel1.type = memory agent1.channels.channel1.keep-alive = 120 agent1.channels.channel1.capacity = 500000 agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel agent1.sources.source1.channels = channel1 agent1.sinks.sink1.channel = channel1 |
Flume支持衆多的source和sink類型,詳細手冊可參考官方文檔
http://flume.apache.org/FlumeUserGuide.html
l 一個完整的數據分析系統一般都是由大量任務單元組成:
shell腳本程序,java程序,mapreduce程序、hive腳本等
l 各任務單元之間存在時間前後及先後依賴關係
l 爲了很好地組織起這樣的複雜執行計劃,須要一個工做流調度系統來調度執行;
例如,咱們可能有這樣一個需求,某個業務系統天天產生20G原始數據,咱們天天都要對其進行處理,處理步驟以下所示:
一、 經過Hadoop先將原始數據同步到HDFS上;
二、 藉助MapReduce計算框架對原始數據進行轉換,生成的數據以分區表的形式存儲到多張Hive表中;
三、 須要對Hive中多個表的數據進行JOIN處理,獲得一個明細數據Hive大表;
四、 將明細數據進行復雜的統計分析,獲得結果報表信息;
五、 須要將統計分析獲得的結果數據同步到業務系統中,供業務調用使用。
簡單的任務調度:直接使用linux的crontab來定義;
複雜的任務調度:開發調度平臺
或使用現成的開源調度系統,好比ooize、azkaban等
市面上目前有許多工做流調度器
在hadoop領域,常見的工做流調度器有Oozie, Azkaban,Cascading,Hamake等
下面的表格對上述四種hadoop工做流調度器的關鍵特性進行了比較,儘管這些工做流調度器可以解決的需求場景基本一致,但在設計理念,目標用戶,應用場景等方面仍是存在顯著的區別,在作技術選型的時候,能夠提供參考
特性 |
Hamake |
Oozie |
Azkaban |
Cascading |
工做流描述語言 |
XML |
XML (xPDL based) |
text file with key/value pairs |
Java API |
依賴機制 |
data-driven |
explicit |
explicit |
explicit |
是否要web容器 |
No |
Yes |
Yes |
No |
進度跟蹤 |
console/log messages |
web page |
web page |
Java API |
Hadoop job調度支持 |
no |
yes |
yes |
yes |
運行模式 |
command line utility |
daemon |
daemon |
API |
Pig支持 |
yes |
yes |
yes |
yes |
事件通知 |
no |
no |
no |
yes |
須要安裝 |
no |
yes |
yes |
no |
支持的hadoop版本 |
0.18+ |
0.20+ |
currently unknown |
0.18+ |
重試支持 |
no |
workflownode evel |
yes |
yes |
運行任意命令 |
yes |
yes |
yes |
yes |
Amazon EMR支持 |
yes |
no |
currently unknown |
yes |
對市面上最流行的兩種調度器,給出如下詳細對比,以供技術選型參考。整體來講,ooize相比azkaban是一個重量級的任務調度系統,功能全面,但配置使用也更復雜。若是能夠不在乎某些功能的缺失,輕量級調度器azkaban是很不錯的候選對象。
詳情以下:
u 功能
二者都可以調度mapreduce,pig,java,腳本工做流任務
二者都可以定時執行工做流任務
u 工做流定義
Azkaban使用Properties文件定義工做流
Oozie使用XML文件定義工做流
u 工做流傳參
Azkaban支持直接傳參,例如${input}
Oozie支持參數和EL表達式,例如${fs:dirSize(myInputDir)}
u 定時執行
Azkaban的定時執行任務是基於時間的
Oozie的定時執行任務基於時間和輸入數據
u 資源管理
Azkaban有較嚴格的權限控制,如用戶對工做流進行讀/寫/執行等操做
Oozie暫無嚴格的權限控制
u 工做流執行
Azkaban有兩種運行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server能夠部署在不一樣節點)
Oozie做爲工做流服務器運行,支持多用戶和多工做流
u 工做流管理
Azkaban支持瀏覽器以及ajax方式操做工做流
Oozie支持命令行、HTTP REST、Java API、瀏覽器操做工做流
Azkaban是由Linkedin開源的一個批量工做流任務調度器。用於在一個工做流內以一個特定的順序運行一組工做和流程。Azkaban定義了一種KV文件格式來創建任務之間的依賴關係,並提供一個易於使用的web用戶界面維護和跟蹤你的工做流。
它有以下功能特色:
² Web用戶界面
² 方便上傳工做流
² 方便設置任務之間的關係
² 調度工做流
² 認證/受權(權限的工做)
² 可以殺死並從新啓動工做流
² 模塊化和可插拔的插件機制
² 項目工做區
² 工做流和任務的日誌記錄和審計
Azkaban Web服務器
azkaban-web-server-2.5.0.tar.gz
Azkaban執行服務器
azkaban-executor-server-2.5.0.tar.gz
MySQL
目前azkaban只支持 mysql,需安裝mysql服務器,本文檔中默認已安裝好mysql服務器,並創建了 root用戶,密碼 root.
下載地址:http://azkaban.github.io/downloads.html
將安裝文件上傳到集羣,最好上傳到安裝 hive、sqoop的機器上,方便命令的執行
在當前用戶目錄下新建 azkabantools目錄,用於存放源安裝文件.新建azkaban目錄,用於存放azkaban運行程序
解壓azkaban-web-server-2.5.0.tar.gz
命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz
將解壓後的azkaban-web-server-2.5.0 移動到 azkaban目錄中,並從新命名 webserver
命令: mv azkaban-web-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-web-server-2.5.0 server
解壓azkaban-executor-server-2.5.0.tar.gz
命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz
將解壓後的azkaban-executor-server-2.5.0 移動到 azkaban目錄中,並從新命名 executor
命令:mv azkaban-executor-server-2.5.0 ../azkaban
cd ../azkaban
mv azkaban-executor-server-2.5.0 executor
azkaban腳本導入
解壓: azkaban-sql-script-2.5.0.tar.gz
命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz
將解壓後的mysql 腳本,導入到mysql中:
進入mysql
mysql> create database azkaban;
mysql> use azkaban;
Database changed
mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;
參考地址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL
命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA
運行此命令後,會提示輸入當前生成 keystor的密碼及相應信息,輸入的密碼請勞記,信息以下:
輸入keystore密碼:
再次輸入新密碼:
您的名字與姓氏是什麼?
[Unknown]:
您的組織單位名稱是什麼?
[Unknown]:
您的組織名稱是什麼?
[Unknown]:
您所在的城市或區域名稱是什麼?
[Unknown]:
您所在的州或省份名稱是什麼?
[Unknown]:
該單位的兩字母國家代碼是什麼
[Unknown]: CN
CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?
[否]: y
輸入<jetty>的主密碼
(若是和 keystore 密碼相同,按回車):
再次輸入新密碼:
完成上述工做後,將在當前目錄生成 keystore 證書文件,將keystore 考貝到 azkaban web服務器根目錄中.如:cp keystore azkaban/webserver
注:先配置好服務器節點上的時區
一、先生成時區配置文件Asia/Shanghai,用交互式命令 tzselect 便可
二、拷貝該時區文件,覆蓋系統本地時區配置
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime
azkaban web服務器配置
進入azkaban web服務器安裝目錄 conf目錄
v 修改azkaban.properties文件
命令vi azkaban.properties
內容說明以下:
#Azkaban Personalization Settings azkaban.name=Test #服務器UI名稱,用於服務器上方顯示的名字 azkaban.label=My Local Azkaban #描述 azkaban.color=#FF3601 #UI顏色 azkaban.default.servlet.path=/index # web.resource.dir=web/ #默認根web目錄 default.timezone.id=Asia/Shanghai #默認時區,已改成亞洲/上海 默認爲美國
#Azkaban UserManager class user.manager.class=azkaban.user.XmlUserManager #用戶權限管理默認類 user.manager.xml.file=conf/azkaban-users.xml #用戶配置,具體配置參加下文
#Loader for projects executor.global.properties=conf/global.properties # global配置文件所在位置 azkaban.project.dir=projects #
database.type=mysql #數據庫類型 mysql.port=3306 #端口號 mysql.host=hadoop03 #數據庫鏈接IP mysql.database=azkaban #數據庫實例名 mysql.user=root #數據庫用戶名 mysql.password=root #數據庫密碼 mysql.numconnections=100 #最大鏈接數
# Velocity dev mode velocity.dev.mode=false # Jetty服務器屬性. jetty.maxThreads=25 #最大線程數 jetty.ssl.port=8443 #Jetty SSL端口 jetty.port=8081 #Jetty端口 jetty.keystore=keystore #SSL文件名 jetty.password=123456 #SSL文件密碼 jetty.keypassword=123456 #Jetty主密碼 與 keystore文件相同 jetty.truststore=keystore #SSL文件名 jetty.trustpassword=123456 # SSL文件密碼
# 執行服務器屬性 executor.port=12321 #執行服務器端口
# 郵件設置 mail.sender=xxxxxxxx@163.com #發送郵箱 mail.host=smtp.163.com #發送郵箱smtp地址 mail.user=xxxxxxxx #發送郵件時顯示的名稱 mail.password=********** #郵箱密碼 job.failure.email=xxxxxxxx@163.com #任務失敗時發送郵件的地址 job.success.email=xxxxxxxx@163.com #任務成功時發送郵件的地址 lockdown.create.projects=false # cache.directory=cache #緩存目錄
|
v azkaban 執行服務器配置
進入執行服務器安裝目錄conf,修改azkaban.properties
vi azkaban.properties
#Azkaban default.timezone.id=Asia/Shanghai #時區
# Azkaban JobTypes 插件配置 azkaban.jobtype.plugin.dir=plugins/jobtypes #jobtype 插件所在位置
#Loader for projects executor.global.properties=conf/global.properties azkaban.project.dir=projects
#數據庫設置 database.type=mysql #數據庫類型(目前只支持mysql) mysql.port=3306 #數據庫端口號 mysql.host=192.168.20.200 #數據庫IP地址 mysql.database=azkaban #數據庫實例名 mysql.user=azkaban #數據庫用戶名 mysql.password=oracle #數據庫密碼 mysql.numconnections=100 #最大鏈接數
# 執行服務器配置 executor.maxThreads=50 #最大線程數 executor.port=12321 #端口號(如修改,請與web服務中一致) executor.flow.threads=30 #線程數 |
v 用戶配置
進入azkaban web服務器conf目錄,修改azkaban-users.xml
vi azkaban-users.xml 增長 管理員用戶
<azkaban-users> <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" /> <user username="metrics" password="metrics" roles="metrics"/> <user username="admin" password="admin" roles="admin,metrics" /> <role name="admin" permissions="ADMIN" /> <role name="metrics" permissions="METRICS"/> </azkaban-users> |
在azkaban web服務器目錄下執行啓動命令
bin/azkaban-web-start.sh
注:在web服務器根目錄運行
在執行服務器目錄下執行啓動命令
bin/azkaban-executor-start.sh ./
注:只能要執行服務器根目錄運行
啓動完成後,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://服務器IP地址:8443 ,便可訪問azkaban服務了.在登陸中輸入剛纔新的戶用名及密碼,點擊 login.
Azkaba內置的任務類型支持command、java
一、建立job描述文件
vi command.job
#command.job type=command command=echo 'hello' |
二、將job資源文件打包成zip文件
zip command.job
三、經過azkaban的web管理平臺建立project並上傳job壓縮包
首先建立project
上傳zip包
四、啓動執行該job
一、建立有依賴關係的多個job描述
第一個job:foo.job
# foo.job type=command command=echo foo |
第二個job:bar.job依賴foo.job
# bar.job type=command dependencies=foo command=echo bar |
二、將全部job資源文件打到一個zip包中
三、在azkaban的web管理界面建立工程並上傳zip包
四、啓動工做流flow
一、建立job描述文件
# fs.job type=command command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz |
二、將job資源文件打包成zip文件
三、經過azkaban的web管理平臺建立project並上傳job壓縮包
四、啓動執行該job
Mr任務依然可使用command的job類型來執行
一、建立job描述文件,及mr程序jar包(示例中直接使用hadoop自帶的example jar)
# mrwc.job type=command command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop jar hadoop-mapreduce-examples-2.6.1.jar wordcount /wordcount/input /wordcount/azout |
二、將全部job資源文件打到一個zip包中
三、在azkaban的web管理界面建立工程並上傳zip包
四、啓動job
l 建立job描述文件和hive腳本
Hive腳本: test.sql
use default; drop table aztest; create table aztest(id int,name string) row format delimited fields terminated by ','; load data inpath '/aztest/hiveinput' into table aztest; create table azres as select * from aztest; insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest; |
Job描述文件:hivef.job
# hivef.job type=command command=/home/hadoop/apps/hive/bin/hive -f 'test.sql' |
二、將全部job資源文件打到一個zip包中
三、在azkaban的web管理界面建立工程並上傳zip包
四、啓動job
sqoop是apache旗下一款「Hadoop和關係數據庫服務器之間傳送數據」的工具。
導入數據:MySQL,Oracle導入數據到Hadoop的HDFS、HIVE、HBASE等數據存儲系統;
導出數據:從Hadoop的文件系統中導出數據到關係數據庫
將導入或導出命令翻譯成mapreduce程序來實現
在翻譯出的mapreduce中主要是對inputformat和outputformat進行定製
安裝sqoop的前提是已經具有java和hadoop的環境
最新版下載地址http://ftp.wayne.edu/apache/sqoop/1.4.6/
$ cd $SQOOP_HOME/conf
$ mv sqoop-env-template.sh sqoop-env.sh
打開sqoop-env.sh並編輯下面幾行:
export HADOOP_COMMON_HOME=/home/hadoop/apps/hadoop-2.6.1/
export HADOOP_MAPRED_HOME=/home/hadoop/apps/hadoop-2.6.1/
export HIVE_HOME=/home/hadoop/apps/hive-1.2.1
cp ~/app/hive/lib/mysql-connector-java-5.1.28.jar $SQOOP_HOME/lib/
$ cd $SQOOP_HOME/bin
$ sqoop-version
預期的輸出:
15/12/17 14:52:32 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6
Sqoop 1.4.6 git commit id 5b34accaca7de251fc91161733f906af2eddbe83
Compiled by abe on Fri Aug 1 11:19:26 PDT 2015
到這裏,整個Sqoop安裝工做完成。
「導入工具」導入單個表從RDBMS到HDFS。表中的每一行被視爲HDFS的記錄。全部記錄都存儲爲文本文件的文本數據(或者Avro、sequence文件等二進制數據)
下面的語法用於將數據導入HDFS。
$ sqoop import (generic-args) (import-args) |
在mysql中有一個庫userdb中三個表:emp, emp_add和emp_contact
表emp:
id |
name |
deg |
salary |
dept |
1201 |
gopal |
manager |
50,000 |
TP |
1202 |
manisha |
Proof reader |
50,000 |
TP |
1203 |
khalil |
php dev |
30,000 |
AC |
1204 |
prasanth |
php dev |
30,000 |
AC |
1205 |
kranthi |
admin |
20,000 |
TP |
表emp_add:
id |
hno |
street |
city |
1201 |
288A |
vgiri |
jublee |
1202 |
108I |
aoc |
sec-bad |
1203 |
144Z |
pgutta |
hyd |
1204 |
78B |
old city |
sec-bad |
1205 |
720X |
hitec |
sec-bad |
表emp_conn:
id |
phno |
|
1201 |
2356742 |
gopal@tp.com |
1202 |
1661663 |
manisha@tp.com |
1203 |
8887776 |
khalil@ac.com |
1204 |
9988774 |
prasanth@ac.com |
1205 |
1231231 |
kranthi@tp.com |
下面的命令用於從MySQL數據庫服務器中的emp表導入HDFS。
$bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp --m 1 |
若是成功執行,那麼會獲得下面的輸出。
14/12/22 15:24:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5 14/12/22 15:24:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset. INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/cebe706d23ebb1fd99c1f063ad51ebd7/emp.jar ----------------------------------------------------- O mapreduce.Job: map 0% reduce 0% 14/12/22 15:28:08 INFO mapreduce.Job: map 100% reduce 0% 14/12/22 15:28:16 INFO mapreduce.Job: Job job_1419242001831_0001 completed successfully ----------------------------------------------------- ----------------------------------------------------- 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Transferred 145 bytes in 177.5849 seconds (0.8165 bytes/sec) 14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Retrieved 5 records. |
爲了驗證在HDFS導入的數據,請使用如下命令查看導入的數據
$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-00000 |
emp表的數據和字段之間用逗號(,)表示。
1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP |
bin/sqoop import --connect jdbc:mysql://hdp-node-01:3306/test --username root --password root --table emp --hive-import --m 1 |
在導入表數據到HDFS使用Sqoop導入工具,咱們能夠指定目標目錄。
如下是指定目標目錄選項的Sqoop導入命令的語法。
--target-dir <new or exist directory in HDFS> |
下面的命令是用來導入emp_add表數據到'/queryresult'目錄。
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --target-dir /queryresult \ --table emp --m 1 |
下面的命令是用來驗證 /queryresult 目錄中 emp_add表導入的數據形式。
$HADOOP_HOME/bin/hadoop fs -cat /queryresult/part-m-* |
它會用逗號(,)分隔emp_add表的數據和字段。
1201, 288A, vgiri, jublee 1202, 108I, aoc, sec-bad 1203, 144Z, pgutta, hyd 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad |
咱們能夠導入表的使用Sqoop導入工具,"where"子句的一個子集。它執行在各自的數據庫服務器相應的SQL查詢,並將結果存儲在HDFS的目標目錄。
where子句的語法以下。
--where <condition> |
下面的命令用來導入emp_add表數據的子集。子集查詢檢索員工ID和地址,居住城市爲:Secunderabad
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --where "city ='sec-bad'" \ --target-dir /wherequery \ --table emp_add --m 1 |
下面的命令用來驗證數據從emp_add表導入/wherequery目錄
$HADOOP_HOME/bin/hadoop fs -cat /wherequery/part-m-* |
它用逗號(,)分隔 emp_add表數據和字段。
1202, 108I, aoc, sec-bad 1204, 78B, oldcity, sec-bad 1205, 720C, hitech, sec-bad |
增量導入是僅導入新添加的表中的行的技術。
它須要添加‘incremental’, ‘check-column’, 和 ‘last-value’選項來執行增量導入。
下面的語法用於Sqoop導入命令增量選項。
--incremental <mode> --check-column <column name> --last value <last check column value>
|
假設新添加的數據轉換成emp表以下:
1206, satish p, grp des, 20000, GR
下面的命令用於在EMP表執行增量導入。
bin/sqoop import \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp --m 1 \ --incremental append \ --check-column id \ --last-value 1205 |
如下命令用於從emp表導入HDFS emp/ 目錄的數據驗證。
$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-* 它用逗號(,)分隔 emp_add表數據和字段。 1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR |
下面的命令是從表emp 用來查看修改或新添加的行
$ $HADOOP_HOME/bin/hadoop fs -cat /emp/part-m-*1 這表示新添加的行用逗號(,)分隔emp表的字段。 1206, satish p, grp des, 20000, GR |
將數據從HDFS導出到RDBMS數據庫
導出前,目標表必須存在於目標數據庫中。
u 默認操做是從將文件中的數據使用INSERT語句插入到表中
u 更新模式下,是生成UPDATE語句更新表數據
如下是export命令語法。
$ sqoop export (generic-args) (export-args) |
數據是在HDFS 中「EMP/」目錄的emp_data文件中。所述emp_data以下:
1201, gopal, manager, 50000, TP 1202, manisha, preader, 50000, TP 1203, kalil, php dev, 30000, AC 1204, prasanth, php dev, 30000, AC 1205, kranthi, admin, 20000, TP 1206, satish p, grp des, 20000, GR |
一、首先須要手動建立mysql中的目標表
$ mysql mysql> USE db; mysql> CREATE TABLE employee ( id INT NOT NULL PRIMARY KEY, name VARCHAR(20), deg VARCHAR(20), salary INT, dept VARCHAR(10)); |
二、而後執行導出命令
bin/sqoop export \ --connect jdbc:mysql://hdp-node-01:3306/test \ --username root \ --password root \ --table emp2 \ --export-dir /user/hadoop/emp/ |
三、驗證表mysql命令行。
mysql>select * from employee; 若是給定的數據存儲成功,那麼能夠找到數據在以下的employee表。 +------+--------------+-------------+-------------------+--------+ | Id | Name | Designation | Salary | Dept | +------+--------------+-------------+-------------------+--------+ | 1201 | gopal | manager | 50000 | TP | | 1202 | manisha | preader | 50000 | TP | | 1203 | kalil | php dev | 30000 | AC | | 1204 | prasanth | php dev | 30000 | AC | | 1205 | kranthi | admin | 20000 | TP | | 1206 | satish p | grp des | 20000 | GR | +------+--------------+-------------+-------------------+--------+ |
注:Sqoop做業——將事先定義好的數據導入導出任務按照指定流程運行
如下是建立Sqoop做業的語法。
$ sqoop job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
$ sqoop-job (generic-args) (job-args) [-- [subtool-name] (subtool-args)]
|
在這裏,咱們建立一個名爲myjob,這能夠從RDBMS表的數據導入到HDFS做業。
bin/sqoop job --create myimportjob -- import --connect jdbc:mysql://hdp-node-01:3306/test --username root --password root --table emp --m 1 |
該命令建立了一個從db庫的employee表導入到HDFS文件的做業。
‘--list’ 參數是用來驗證保存的做業。下面的命令用來驗證保存Sqoop做業的列表。
$ sqoop job --list
它顯示了保存做業列表。
Available jobs:
myjob
檢查做業(--show)
‘--show’ 參數用於檢查或驗證特定的工做,及其詳細信息。如下命令和樣本輸出用來驗證一個名爲myjob的做業。
$ sqoop job --show myjob
它顯示了工具和它們的選擇,這是使用在myjob中做業狀況。
Job: myjob Tool: import Options: ---------------------------- direct.import = true codegen.input.delimiters.record = 0 hdfs.append.dir = false db.table = employee ... incremental.last.value = 1206 ...
|
‘--exec’ 選項用於執行保存的做業。下面的命令用於執行保存的做業稱爲myjob。
$ sqoop job --exec myjob 它會顯示下面的輸出。 10/08/19 13:08:45 INFO tool.CodeGenTool: Beginning code generation ... |
Sqoop的原理其實就是將導入導出命令轉化爲mapreduce程序來執行,sqoop在接收到命令後,都要生成mapreduce程序
使用sqoop的代碼生成工具能夠方便查看到sqoop所生成的java代碼,並可在此基礎之上進行深刻定製開發
如下是Sqoop代碼生成命令的語法:
$ sqoop-codegen (generic-args) (codegen-args) $ sqoop-codegen (generic-args) (codegen-args) |
示例:以USERDB數據庫中的表emp來生成Java代碼爲例。
下面的命令用來生成導入
$ sqoop-codegen \ --import --connect jdbc:mysql://localhost/userdb \ --username root \ --table emp |
若是命令成功執行,那麼它就會產生以下的輸出。
14/12/23 02:34:40 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5 14/12/23 02:34:41 INFO tool.CodeGenTool: Beginning code generation ………………. 14/12/23 02:34:42 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/local/hadoop Note: /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/emp.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 14/12/23 02:34:47 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/emp.jar |
驗證: 查看輸出目錄下的文件
$ cd /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/ $ ls emp.class emp.jar emp.java
|
若是想作深刻定製導出,則可修改上述代碼文件
Source 到 Channel 到 Sink之間傳遞數據的形式是Event事件;Event事件是一個數據流單元。