Flume+Sqoop+Azkaban筆記

大綱(輔助系統)

 

離線輔助系統php

數據接入html

Flume介紹java

Flume組件node

Flume實戰案例mysql

任務調度linux

調度器基礎git

市面上調度工具github

Oozie的使用web

Oozie的流程定義詳解ajax

數據導出

sqoop基礎知識

sqoop實戰及原理

Sqoop數據導入實戰

Sqoop數據導出實戰

Sqoop做業操做

Sqoop的原理

 

 

目標:

一、理解flume、sqoop、oozie的應用場景

二、理解flume、sqoop、oozie的基本原理

三、掌握flume、sqoop、oozie的使用方法

前言

在一個完整的大數據處理系統中,除了hdfs+mapreduce+hive組成分析系統的核心以外,還須要數據採集、結果數據導出、任務調度等不可或缺的輔助系統,而這些輔助工具在hadoop生態體系中都有便捷的開源框架,如圖所示:

1. 日誌採集框架Flume

1.1 Flume介紹

1.1.1 概述

u  Flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。

u  Flume能夠採集文件,socket數據包等各類形式源數據,又能夠將採集到的數據輸出到HDFS、hbase、hive、kafka等衆多外部存儲系統中

u  通常的採集需求,經過對flume的簡單配置便可實現

u  Flume針對特殊場景也具有良好的自定義擴展能力,所以,flume能夠適用於大部分的平常數據採集場景

 

1.1.2 運行機制

一、  Flume分佈式系統中最核心的角色是agent,flume採集系統就是由一個個agent所鏈接起來造成

二、  每個agent至關於一個數據傳遞員,內部有三個組件:

a) Source:採集源,用於跟數據源對接,以獲取數據

b) Sink:下沉地,採集數據的傳送目的,用於往下一級agent傳遞數據或者往最終存儲系統傳遞數據

c) Channel:agent內部的數據傳輸通道,用於從source將數據傳遞到sink

1.1.4 Flume採集系統結構圖

1. 簡單結構

單個agent採集數據

2. 複雜結構

多級agent之間串聯

1.2 Flume實戰案例

1.2.1 Flume的安裝部署

一、Flume的安裝很是簡單,只須要解壓便可,固然,前提是已有hadoop環境

上傳安裝包到數據源所在節點上

而後解壓  tar -zxvf apache-flume-1.6.0-bin.tar.gz

而後進入flume的目錄,修改conf下的flume-env.sh,在裏面配置JAVA_HOME

二、根據數據採集的需求配置採集方案,描述在配置文件中(文件名可任意自定義)

三、指定採集方案配置文件,在相應的節點上啓動flume agent

先用一個最簡單的例子來測試一下程序環境是否正常

一、先在flume的conf目錄下新建一個文件

vi   netcat-logger.conf

# 定義這個agent中各組件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# 描述和配置source組件:r1

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

 

# 描述和配置sink組件:k1

a1.sinks.k1.type = logger

 

# 描述和配置channel組件,此處使用是內存緩存的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 描述和配置source  channel   sink之間的鏈接關係

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 

二、啓動agent去採集數據

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

-c conf   指定flume自身的配置文件所在目錄

-f conf/netcat-logger.con  指定咱們所描述的採集方案

-n a1  指定咱們這個agent的名字

三、測試

先要往agent採集監聽的端口上發送數據,讓agent有數據可採

隨便在一個能跟agent節點聯網的機器上

telnet anget-hostname  port   (telnet localhost 44444)

1.2.2 採集案例

一、採集目錄到HDFS

採集需求:某服務器的某特定目錄下,會不斷產生新的文件,每當有新文件出現,就須要把文件採集到HDFS中去

根據需求,首先定義如下3大要素

l  採集源,即source——監控文件目錄 :  spooldir

l  下沉目標,即sink——HDFS文件系統  :  hdfs sink

l  source和sink之間的傳遞通道——channel,可用file channel 也能夠用內存channel

配置文件編寫:

#定義三大組件的名稱

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# 配置source組件

agent1.sources.source1.type = spooldir

agent1.sources.source1.spoolDir = /home/hadoop/logs/

agent1.sources.source1.fileHeader = false

 

#配置攔截器

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

 

# 配置sink組件

agent1.sinks.sink1.type = hdfs

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

#agent1.sinks.sink1.hdfs.round = true

#agent1.sinks.sink1.hdfs.roundValue = 10

#agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

Channel參數解釋:

capacity:默認該通道中最大的能夠存儲的event數量

trasactionCapacity:每次最大能夠從source中拿到或者送到sink中的event數量

keep-alive:event添加到通道中或者移出的容許時間

二、採集文件到HDFS

採集需求:好比業務系統使用log4j生成的日誌,日誌內容不斷增長,須要把追加到日誌文件中的數據實時採集到hdfs

根據需求,首先定義如下3大要素

l  採集源,即source——監控文件內容更新 :  exec  ‘tail -F file’

l  下沉目標,即sink——HDFS文件系統  :  hdfs sink

l  Source和sink之間的傳遞通道——channel,可用file channel 也能夠用 內存channel

配置文件編寫:

agent1.sources = source1

agent1.sinks = sink1

agent1.channels = channel1

 

# Describe/configure tail -F source1

agent1.sources.source1.type = exec

agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log

agent1.sources.source1.channels = channel1

 

#configure host for source

agent1.sources.source1.interceptors = i1

agent1.sources.source1.interceptors.i1.type = host

agent1.sources.source1.interceptors.i1.hostHeader = hostname

 

# Describe sink1

agent1.sinks.sink1.type = hdfs

#a1.sinks.k1.channel = c1

agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M

agent1.sinks.sink1.hdfs.filePrefix = access_log

agent1.sinks.sink1.hdfs.maxOpenFiles = 5000

agent1.sinks.sink1.hdfs.batchSize= 100

agent1.sinks.sink1.hdfs.fileType = DataStream

agent1.sinks.sink1.hdfs.writeFormat =Text

agent1.sinks.sink1.hdfs.rollSize = 102400

agent1.sinks.sink1.hdfs.rollCount = 1000000

agent1.sinks.sink1.hdfs.rollInterval = 60

agent1.sinks.sink1.hdfs.round = true

agent1.sinks.sink1.hdfs.roundValue = 10

agent1.sinks.sink1.hdfs.roundUnit = minute

agent1.sinks.sink1.hdfs.useLocalTimeStamp = true

 

# Use a channel which buffers events in memory

agent1.channels.channel1.type = memory

agent1.channels.channel1.keep-alive = 120

agent1.channels.channel1.capacity = 500000

agent1.channels.channel1.transactionCapacity = 600

 

# Bind the source and sink to the channel

agent1.sources.source1.channels = channel1

agent1.sinks.sink1.channel = channel1

 

1.3 更多source和sink組件

Flume支持衆多的source和sink類型,詳細手冊可參考官方文檔

http://flume.apache.org/FlumeUserGuide.html

2. 工做流調度器azkaban

2.1 概述

2.1.1爲何須要工做流調度系統

l  一個完整的數據分析系統一般都是由大量任務單元組成:

shell腳本程序,java程序,mapreduce程序、hive腳本等

l  各任務單元之間存在時間前後及先後依賴關係

l  爲了很好地組織起這樣的複雜執行計劃,須要一個工做流調度系統來調度執行;

例如,咱們可能有這樣一個需求,某個業務系統天天產生20G原始數據,咱們天天都要對其進行處理,處理步驟以下所示:

一、  經過Hadoop先將原始數據同步到HDFS上;

二、  藉助MapReduce計算框架對原始數據進行轉換,生成的數據以分區表的形式存儲到多張Hive表中;

三、  須要對Hive中多個表的數據進行JOIN處理,獲得一個明細數據Hive大表;

四、  將明細數據進行復雜的統計分析,獲得結果報表信息;

五、  須要將統計分析獲得的結果數據同步到業務系統中,供業務調用使用。

2.1.2 工做流調度實現方式

簡單的任務調度:直接使用linux的crontab來定義;

複雜的任務調度:開發調度平臺

或使用現成的開源調度系統,好比ooize、azkaban等

2.1.3 常見工做流調度系統

市面上目前有許多工做流調度器

在hadoop領域,常見的工做流調度器有Oozie, Azkaban,Cascading,Hamake等

2.1.4 各類調度工具特性對比

下面的表格對上述四種hadoop工做流調度器的關鍵特性進行了比較,儘管這些工做流調度器可以解決的需求場景基本一致,但在設計理念,目標用戶,應用場景等方面仍是存在顯著的區別,在作技術選型的時候,能夠提供參考

特性

Hamake

Oozie

Azkaban

Cascading

工做流描述語言

XML

XML (xPDL based)

text file with key/value pairs

Java API

依賴機制

data-driven

explicit

explicit

explicit

是否要web容器

No

Yes

Yes

No

進度跟蹤

console/log messages

web page

web page

Java API

Hadoop job調度支持

no

yes

yes

yes

運行模式

command line utility

daemon

daemon

API

Pig支持

yes

yes

yes

yes

事件通知

no

no

no

yes

須要安裝

no

yes

yes

no

支持的hadoop版本

0.18+

0.20+

currently unknown

0.18+

重試支持

no

workflownode evel

yes

yes

運行任意命令

yes

yes

yes

yes

Amazon EMR支持

yes

no

currently unknown

yes

2.1.5 Azkaban與Oozie對比

對市面上最流行的兩種調度器,給出如下詳細對比,以供技術選型參考。整體來講,ooize相比azkaban是一個重量級的任務調度系統,功能全面,但配置使用也更復雜。若是能夠不在乎某些功能的缺失,輕量級調度器azkaban是很不錯的候選對象。

詳情以下:

u  功能

二者都可以調度mapreduce,pig,java,腳本工做流任務

二者都可以定時執行工做流任務

u  工做流定義

Azkaban使用Properties文件定義工做流

Oozie使用XML文件定義工做流

u  工做流傳參

Azkaban支持直接傳參,例如${input}

Oozie支持參數和EL表達式,例如${fs:dirSize(myInputDir)}

u  定時執行

Azkaban的定時執行任務是基於時間的

Oozie的定時執行任務基於時間和輸入數據

u  資源管理

Azkaban有較嚴格的權限控制,如用戶對工做流進行讀/寫/執行等操做

Oozie暫無嚴格的權限控制

u  工做流執行

Azkaban有兩種運行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server能夠部署在不一樣節點)

Oozie做爲工做流服務器運行,支持多用戶和多工做流

u  工做流管理

Azkaban支持瀏覽器以及ajax方式操做工做流

Oozie支持命令行、HTTP REST、Java API、瀏覽器操做工做流

2.2 Azkaban介紹

Azkaban是由Linkedin開源的一個批量工做流任務調度器。用於在一個工做流內以一個特定的順序運行一組工做和流程。Azkaban定義了一種KV文件格式來創建任務之間的依賴關係,並提供一個易於使用的web用戶界面維護和跟蹤你的工做流。

它有以下功能特色:

²  Web用戶界面

²  方便上傳工做流

²  方便設置任務之間的關係

²  調度工做流

²  認證/受權(權限的工做)

²  可以殺死並從新啓動工做流

²  模塊化和可插拔的插件機制

²  項目工做區

²  工做流和任務的日誌記錄和審計

2. 3 Azkaban安裝部署

準備工做

Azkaban Web服務器

azkaban-web-server-2.5.0.tar.gz

Azkaban執行服務器 

azkaban-executor-server-2.5.0.tar.gz

MySQL

目前azkaban只支持 mysql,需安裝mysql服務器,本文檔中默認已安裝好mysql服務器,並創建了 root用戶,密碼 root.

下載地址:http://azkaban.github.io/downloads.html

安裝

將安裝文件上傳到集羣,最好上傳到安裝 hive、sqoop的機器上,方便命令的執行

在當前用戶目錄下新建 azkabantools目錄,用於存放源安裝文件.新建azkaban目錄,用於存放azkaban運行程序

azkaban web服務器安裝

解壓azkaban-web-server-2.5.0.tar.gz

命令: tar –zxvf azkaban-web-server-2.5.0.tar.gz

將解壓後的azkaban-web-server-2.5.0 移動到 azkaban目錄中,並從新命名 webserver

命令: mv azkaban-web-server-2.5.0 ../azkaban

        cd ../azkaban

        mv azkaban-web-server-2.5.0  server

azkaban 執行服器安裝

解壓azkaban-executor-server-2.5.0.tar.gz

命令:tar –zxvf azkaban-executor-server-2.5.0.tar.gz

將解壓後的azkaban-executor-server-2.5.0 移動到 azkaban目錄中,並從新命名 executor

命令:mv azkaban-executor-server-2.5.0  ../azkaban

cd ../azkaban

mv azkaban-executor-server-2.5.0  executor

azkaban腳本導入

解壓: azkaban-sql-script-2.5.0.tar.gz

命令:tar –zxvf azkaban-sql-script-2.5.0.tar.gz

將解壓後的mysql 腳本,導入到mysql中:

進入mysql

mysql> create database azkaban;

mysql> use azkaban;

Database changed

mysql> source /home/hadoop/azkaban-2.5.0/create-all-sql-2.5.0.sql;

建立SSL配置

參考地址: http://docs.codehaus.org/display/JETTY/How+to+configure+SSL

命令: keytool -keystore keystore -alias jetty -genkey -keyalg RSA

運行此命令後,會提示輸入當前生成 keystor的密碼及相應信息,輸入的密碼請勞記,信息以下:

輸入keystore密碼: 

再次輸入新密碼:

您的名字與姓氏是什麼?

  [Unknown]: 

您的組織單位名稱是什麼?

  [Unknown]: 

您的組織名稱是什麼?

  [Unknown]: 

您所在的城市或區域名稱是什麼?

  [Unknown]: 

您所在的州或省份名稱是什麼?

  [Unknown]: 

該單位的兩字母國家代碼是什麼

  [Unknown]:  CN

CN=Unknown, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=CN 正確嗎?

  [否]:  y

輸入<jetty>的主密碼

        (若是和 keystore 密碼相同,按回車): 

再次輸入新密碼:

完成上述工做後,將在當前目錄生成 keystore 證書文件,將keystore 考貝到 azkaban web服務器根目錄中.如:cp keystore azkaban/webserver

配置文件

注:先配置好服務器節點上的時區

一、先生成時區配置文件Asia/Shanghai,用交互式命令 tzselect 便可

二、拷貝該時區文件,覆蓋系統本地時區配置

cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime 

azkaban web服務器配置

進入azkaban web服務器安裝目錄 conf目錄

v  修改azkaban.properties文件

命令vi azkaban.properties

內容說明以下:

#Azkaban Personalization Settings

azkaban.name=Test                           #服務器UI名稱,用於服務器上方顯示的名字

azkaban.label=My Local Azkaban                               #描述

azkaban.color=#FF3601                                                 #UI顏色

azkaban.default.servlet.path=/index                         #

web.resource.dir=web/                                                 #默認根web目錄

default.timezone.id=Asia/Shanghai                           #默認時區,已改成亞洲/上海 默認爲美國

 

#Azkaban UserManager class

user.manager.class=azkaban.user.XmlUserManager   #用戶權限管理默認類

user.manager.xml.file=conf/azkaban-users.xml              #用戶配置,具體配置參加下文

 

#Loader for projects

executor.global.properties=conf/global.properties    # global配置文件所在位置

azkaban.project.dir=projects                                                #

 

database.type=mysql                                                              #數據庫類型

mysql.port=3306                                                                       #端口號

mysql.host=hadoop03                                                      #數據庫鏈接IP

mysql.database=azkaban                                                       #數據庫實例名

mysql.user=root                                                                 #數據庫用戶名

mysql.password=root                                                          #數據庫密碼

mysql.numconnections=100                                                  #最大鏈接數

 

# Velocity dev mode

velocity.dev.mode=false

# Jetty服務器屬性.

jetty.maxThreads=25                                                               #最大線程數

jetty.ssl.port=8443                                                                   #Jetty SSL端口

jetty.port=8081                                                                         #Jetty端口

jetty.keystore=keystore                                                          #SSL文件名

jetty.password=123456                                                             #SSL文件密碼

jetty.keypassword=123456                                                      #Jetty主密碼 與 keystore文件相同

jetty.truststore=keystore                                                                #SSL文件名

jetty.trustpassword=123456                                                   # SSL文件密碼

 

# 執行服務器屬性

executor.port=12321                                                               #執行服務器端口

 

# 郵件設置

mail.sender=xxxxxxxx@163.com                                       #發送郵箱

mail.host=smtp.163.com                                                       #發送郵箱smtp地址

mail.user=xxxxxxxx                                       #發送郵件時顯示的名稱

mail.password=**********                                                 #郵箱密碼

job.failure.email=xxxxxxxx@163.com                              #任務失敗時發送郵件的地址

job.success.email=xxxxxxxx@163.com                            #任務成功時發送郵件的地址

lockdown.create.projects=false                                           #

cache.directory=cache                                                            #緩存目錄

 

 

v  azkaban 執行服務器配置

進入執行服務器安裝目錄conf,修改azkaban.properties

vi azkaban.properties

#Azkaban

default.timezone.id=Asia/Shanghai                                              #時區

 

# Azkaban JobTypes 插件配置

azkaban.jobtype.plugin.dir=plugins/jobtypes                   #jobtype 插件所在位置

 

#Loader for projects

executor.global.properties=conf/global.properties

azkaban.project.dir=projects

 

#數據庫設置

database.type=mysql                                                                       #數據庫類型(目前只支持mysql)

mysql.port=3306                                                                                #數據庫端口號

mysql.host=192.168.20.200                                                           #數據庫IP地址

mysql.database=azkaban                                                                #數據庫實例名

mysql.user=azkaban                                                                         #數據庫用戶名

mysql.password=oracle                                                                   #數據庫密碼

mysql.numconnections=100                                                           #最大鏈接數

 

# 執行服務器配置

executor.maxThreads=50                                                                #最大線程數

executor.port=12321                                                               #端口號(如修改,請與web服務中一致)

executor.flow.threads=30                                                                #線程數

 

v  用戶配置

進入azkaban web服務器conf目錄,修改azkaban-users.xml

vi azkaban-users.xml 增長 管理員用戶

<azkaban-users>

        <user username="azkaban" password="azkaban" roles="admin" groups="azkaban" />

        <user username="metrics" password="metrics" roles="metrics"/>

        <user username="admin" password="admin" roles="admin,metrics" />

        <role name="admin" permissions="ADMIN" />

        <role name="metrics" permissions="METRICS"/>

</azkaban-users>

 

啓動

web服務器

在azkaban web服務器目錄下執行啓動命令

bin/azkaban-web-start.sh

注:在web服務器根目錄運行

執行服務器

在執行服務器目錄下執行啓動命令

bin/azkaban-executor-start.sh ./

注:只能要執行服務器根目錄運行

啓動完成後,在瀏覽器(建議使用谷歌瀏覽器)中輸入https://服務器IP地址:8443 ,便可訪問azkaban服務了.在登陸中輸入剛纔新的戶用名及密碼,點擊 login.

2.4 Azkaban實戰

Azkaba內置的任務類型支持command、java

Command類型單一job示例

一、建立job描述文件

vi command.job

#command.job

type=command                                                   

command=echo 'hello'

 

 

二、將job資源文件打包成zip文件

zip command.job

三、經過azkaban的web管理平臺建立project並上傳job壓縮包

首先建立project

上傳zip包

四、啓動執行該job

Command類型多job工做流flow

一、建立有依賴關係的多個job描述

第一個job:foo.job

# foo.job

type=command

command=echo foo

第二個job:bar.job依賴foo.job

# bar.job

type=command

dependencies=foo

command=echo bar

 

二、將全部job資源文件打到一個zip包中

三、在azkaban的web管理界面建立工程並上傳zip包

四、啓動工做流flow

HDFS操做任務

一、建立job描述文件

# fs.job

type=command

command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop fs -mkdir /azaz

 

二、將job資源文件打包成zip文件

 

三、經過azkaban的web管理平臺建立project並上傳job壓縮包

四、啓動執行該job

MAPREDUCE任務

Mr任務依然可使用command的job類型來執行

一、建立job描述文件,及mr程序jar包(示例中直接使用hadoop自帶的example jar)

# mrwc.job

type=command

command=/home/hadoop/apps/hadoop-2.6.1/bin/hadoop jar hadoop-mapreduce-examples-2.6.1.jar wordcount  /wordcount/input  /wordcount/azout

 

二、將全部job資源文件打到一個zip包中

三、在azkaban的web管理界面建立工程並上傳zip包

四、啓動job

HIVE腳本任務

l  建立job描述文件和hive腳本

Hive腳本: test.sql

use default;

drop table aztest;

create table aztest(id int,name string) row format delimited fields terminated by ',';

load data inpath '/aztest/hiveinput' into table aztest;

create table azres as select * from aztest;

insert overwrite directory '/aztest/hiveoutput' select count(1) from aztest;

Job描述文件:hivef.job

# hivef.job

type=command

command=/home/hadoop/apps/hive/bin/hive -f 'test.sql'

 

二、將全部job資源文件打到一個zip包中

三、在azkaban的web管理界面建立工程並上傳zip包

四、啓動job

3. sqoop數據遷移

3.1 概述

sqoop是apache旗下一款Hadoop和關係數據庫服務器之間傳送數據」的工具。

導入數據:MySQL,Oracle導入數據到Hadoop的HDFS、HIVE、HBASE等數據存儲系統;

導出數據:從Hadoop的文件系統中導出數據到關係數據庫

3.2 工做機制

將導入或導出命令翻譯成mapreduce程序來實現

在翻譯出的mapreduce中主要是對inputformat和outputformat進行定製

3.3 sqoop實戰及原理

3.3.1 sqoop安裝

安裝sqoop的前提是已經具有java和hadoop的環境

一、下載並解壓

最新版下載地址http://ftp.wayne.edu/apache/sqoop/1.4.6/

二、修改配置文件

$ cd $SQOOP_HOME/conf

$ mv sqoop-env-template.sh sqoop-env.sh

打開sqoop-env.sh並編輯下面幾行:

export HADOOP_COMMON_HOME=/home/hadoop/apps/hadoop-2.6.1/

export HADOOP_MAPRED_HOME=/home/hadoop/apps/hadoop-2.6.1/

export HIVE_HOME=/home/hadoop/apps/hive-1.2.1

三、加入mysql的jdbc驅動包

cp  ~/app/hive/lib/mysql-connector-java-5.1.28.jar   $SQOOP_HOME/lib/

四、驗證啓動

$ cd $SQOOP_HOME/bin

$ sqoop-version

預期的輸出:

15/12/17 14:52:32 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6

Sqoop 1.4.6 git commit id 5b34accaca7de251fc91161733f906af2eddbe83

Compiled by abe on Fri Aug 1 11:19:26 PDT 2015

到這裏,整個Sqoop安裝工做完成。

3.4 Sqoop的數據導入

「導入工具」導入單個表從RDBMS到HDFS。表中的每一行被視爲HDFS的記錄。全部記錄都存儲爲文本文件的文本數據(或者Avro、sequence文件等二進制數據) 

3.4.1 語法

下面的語法用於將數據導入HDFS。

$ sqoop import (generic-args) (import-args)

 

3.4.2 示例

表數據

在mysql中有一個庫userdb中三個表:emp, emp_add和emp_contact

表emp:

id

name

deg

salary

dept

1201

gopal

manager

50,000

TP

1202

manisha

Proof reader

50,000

TP

1203

khalil

php dev

30,000

AC

1204

prasanth

php dev

30,000

AC

1205

kranthi

admin

20,000

TP

表emp_add:

id

hno

street

city

1201

288A

vgiri

jublee

1202

108I

aoc

sec-bad

1203

144Z

pgutta

hyd

1204

78B

old city

sec-bad

1205

720X

hitec

sec-bad

表emp_conn:

 

id

phno

email

1201

2356742

gopal@tp.com

1202

1661663

manisha@tp.com

1203

8887776

khalil@ac.com

1204

9988774

prasanth@ac.com

1205

1231231

kranthi@tp.com

導入表表數據到HDFS

下面的命令用於從MySQL數據庫服務器中的emp表導入HDFS。

$bin/sqoop import \

--connect jdbc:mysql://hdp-node-01:3306/test \

--username root \

--password root \

--table emp

--m 1

 

若是成功執行,那麼會獲得下面的輸出。

14/12/22 15:24:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5

14/12/22 15:24:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.

INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/cebe706d23ebb1fd99c1f063ad51ebd7/emp.jar

-----------------------------------------------------

O mapreduce.Job: map 0% reduce 0%

14/12/22 15:28:08 INFO mapreduce.Job: map 100% reduce 0%

14/12/22 15:28:16 INFO mapreduce.Job: Job job_1419242001831_0001 completed successfully

-----------------------------------------------------

-----------------------------------------------------

14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Transferred 145 bytes in 177.5849 seconds (0.8165 bytes/sec)

14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Retrieved 5 records.

 

爲了驗證在HDFS導入的數據,請使用如下命令查看導入的數據

$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-00000

 

emp表的數據和字段之間用逗號(,)表示。

1201, gopal,    manager, 50000, TP

1202, manisha,  preader, 50000, TP

1203, kalil,    php dev, 30000, AC

1204, prasanth, php dev, 30000, AC

1205, kranthi,  admin,   20000, TP

 

導入關係表到HIVE

bin/sqoop import

--connect jdbc:mysql://hdp-node-01:3306/test

--username root

--password root

--table emp

--hive-import

--m 1

導入到HDFS指定目錄

在導入表數據到HDFS使用Sqoop導入工具,咱們能夠指定目標目錄。

如下是指定目標目錄選項的Sqoop導入命令的語法。

--target-dir <new or exist directory in HDFS>

 

下面的命令是用來導入emp_add表數據到'/queryresult'目錄。

bin/sqoop import \

--connect jdbc:mysql://hdp-node-01:3306/test \

--username root \

--password root \

--target-dir /queryresult \

--table emp --m 1

 

 

下面的命令是用來驗證 /queryresult 目錄中 emp_add表導入的數據形式。

 $HADOOP_HOME/bin/hadoop fs -cat /queryresult/part-m-*

 

它會用逗號(,)分隔emp_add表的數據和字段。

1201, 288A, vgiri,   jublee

1202, 108I, aoc,     sec-bad

1203, 144Z, pgutta,  hyd

1204, 78B,  oldcity, sec-bad

1205, 720C, hitech,  sec-bad

 

導入表數據子集

咱們能夠導入表的使用Sqoop導入工具,"where"子句的一個子集。它執行在各自的數據庫服務器相應的SQL查詢,並將結果存儲在HDFS的目標目錄。

where子句的語法以下。

--where <condition>

 

下面的命令用來導入emp_add表數據的子集。子集查詢檢索員工ID和地址,居住城市爲:Secunderabad

bin/sqoop import \

--connect jdbc:mysql://hdp-node-01:3306/test \

--username root \

--password root \

--where "city ='sec-bad'" \

--target-dir /wherequery \

--table emp_add --m 1

 

下面的命令用來驗證數據從emp_add表導入/wherequery目錄

$HADOOP_HOME/bin/hadoop fs -cat /wherequery/part-m-*

 

它用逗號(,)分隔 emp_add表數據和字段。

1202, 108I, aoc, sec-bad

1204, 78B, oldcity, sec-bad

1205, 720C, hitech, sec-bad

 

增量導入

增量導入是僅導入新添加的表中的行的技術。

它須要添加‘incremental’, ‘check-column’, 和 ‘last-value’選項來執行增量導入。

下面的語法用於Sqoop導入命令增量選項。

--incremental <mode>

--check-column <column name>

--last value <last check column value>

 

 

假設新添加的數據轉換成emp表以下:

1206, satish p, grp des, 20000, GR

下面的命令用於在EMP表執行增量導入。

bin/sqoop import \

--connect jdbc:mysql://hdp-node-01:3306/test \

--username root \

--password root \

--table emp --m 1 \

--incremental append \

--check-column id \

--last-value 1205

 

如下命令用於從emp表導入HDFS emp/ 目錄的數據驗證。

$ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-*

它用逗號(,)分隔 emp_add表數據和字段。

1201, gopal,    manager, 50000, TP

1202, manisha,  preader, 50000, TP

1203, kalil,    php dev, 30000, AC

1204, prasanth, php dev, 30000, AC

1205, kranthi,  admin,   20000, TP

1206, satish p, grp des, 20000, GR

 

下面的命令是從表emp 用來查看修改或新添加的行

$ $HADOOP_HOME/bin/hadoop fs -cat /emp/part-m-*1

這表示新添加的行用逗號(,)分隔emp表的字段。

1206, satish p, grp des, 20000, GR

 

3.5 Sqoop的數據導出

將數據從HDFS導出到RDBMS數據庫

導出前,目標表必須存在於目標數據庫中。

u  默認操做是從將文件中的數據使用INSERT語句插入到表中

u  更新模式下,是生成UPDATE語句更新表數據

語法

如下是export命令語法。

$ sqoop export (generic-args) (export-args)

 

示例

數據是在HDFS 中「EMP/」目錄的emp_data文件中。所述emp_data以下:

1201, gopal,     manager, 50000, TP

1202, manisha,   preader, 50000, TP

1203, kalil,     php dev, 30000, AC

1204, prasanth,  php dev, 30000, AC

1205, kranthi,   admin,   20000, TP

1206, satish p,  grp des, 20000, GR

 

一、首先須要手動建立mysql中的目標表

$ mysql

mysql> USE db;

mysql> CREATE TABLE employee (

   id INT NOT NULL PRIMARY KEY,

   name VARCHAR(20),

   deg VARCHAR(20),

   salary INT,

   dept VARCHAR(10));

 

二、而後執行導出命令

bin/sqoop export \

--connect jdbc:mysql://hdp-node-01:3306/test \

--username root \

--password root \

--table emp2 \

--export-dir /user/hadoop/emp/

 

三、驗證表mysql命令行。

mysql>select * from employee;

若是給定的數據存儲成功,那麼能夠找到數據在以下的employee表。

+------+--------------+-------------+-------------------+--------+

| Id   | Name         | Designation | Salary            | Dept   |

+------+--------------+-------------+-------------------+--------+

| 1201 | gopal        | manager     | 50000             | TP     |

| 1202 | manisha      | preader     | 50000             | TP     |

| 1203 | kalil        | php dev     | 30000             | AC     |

| 1204 | prasanth     | php dev     | 30000             | AC     |

| 1205 | kranthi      | admin       | 20000             | TP     |

| 1206 | satish p     | grp des     | 20000             | GR     |

+------+--------------+-------------+-------------------+--------+

 

3.6 Sqoop做業

注:Sqoop做業——將事先定義好的數據導入導出任務按照指定流程運行

語法

如下是建立Sqoop做業的語法。

$ sqoop job (generic-args) (job-args)

   [-- [subtool-name] (subtool-args)]

 

$ sqoop-job (generic-args) (job-args)

   [-- [subtool-name] (subtool-args)]

 

 

建立做業(--create)

在這裏,咱們建立一個名爲myjob,這能夠從RDBMS表的數據導入到HDFS做業。

bin/sqoop job

--create myimportjob

-- import

--connect jdbc:mysql://hdp-node-01:3306/test

--username root

--password root

--table emp

--m 1

該命令建立了一個從db庫的employee表導入到HDFS文件的做業。

驗證做業 (--list)

‘--list’ 參數是用來驗證保存的做業。下面的命令用來驗證保存Sqoop做業的列表。

$ sqoop job --list

它顯示了保存做業列表。

Available jobs:

   myjob

檢查做業(--show)

‘--show’ 參數用於檢查或驗證特定的工做,及其詳細信息。如下命令和樣本輸出用來驗證一個名爲myjob的做業。

$ sqoop job --show myjob

它顯示了工具和它們的選擇,這是使用在myjob中做業狀況。

Job: myjob

 Tool: import Options:

 ----------------------------

 direct.import = true

 codegen.input.delimiters.record = 0

 hdfs.append.dir = false

 db.table = employee

 ...

 incremental.last.value = 1206

 ...

 

 

執行做業 (--exec)

‘--exec’ 選項用於執行保存的做業。下面的命令用於執行保存的做業稱爲myjob。

$ sqoop job --exec myjob

它會顯示下面的輸出。

10/08/19 13:08:45 INFO tool.CodeGenTool: Beginning code generation

...

 

3.7 Sqoop的原理

概述

Sqoop的原理其實就是將導入導出命令轉化爲mapreduce程序來執行,sqoop在接收到命令後,都要生成mapreduce程序

 

使用sqoop的代碼生成工具能夠方便查看到sqoop所生成的java代碼,並可在此基礎之上進行深刻定製開發

代碼定製

如下是Sqoop代碼生成命令的語法:

$ sqoop-codegen (generic-args) (codegen-args)

$ sqoop-codegen (generic-args) (codegen-args)

 

示例:以USERDB數據庫中的表emp來生成Java代碼爲例。

下面的命令用來生成導入

$ sqoop-codegen \

--import

--connect jdbc:mysql://localhost/userdb \

--username root \

--table emp

 

若是命令成功執行,那麼它就會產生以下的輸出。

14/12/23 02:34:40 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5

14/12/23 02:34:41 INFO tool.CodeGenTool: Beginning code generation

……………….

14/12/23 02:34:42 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/local/hadoop

Note: /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/emp.java uses or overrides a deprecated API.

Note: Recompile with -Xlint:deprecation for details.

14/12/23 02:34:47 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/emp.jar

 

驗證: 查看輸出目錄下的文件

$ cd /tmp/sqoop-hadoop/compile/9a300a1f94899df4a9b10f9935ed9f91/

$ ls

emp.class

emp.jar

emp.java

 

若是想作深刻定製導出,則可修改上述代碼文件


Source 到 Channel 到 Sink之間傳遞數據的形式是Event事件;Event事件是一個數據流單元。

相關文章
相關標籤/搜索