大數據技術之_09_Flume學習_Flume概述+Flume快速入門+Flume企業開發案例+Flume監控之Ganglia+Flume高級之自定義MySQLSource+Flume企業真實面...

第1章 Flume概述1.1 Flume定義1.2 Flume組成架構1.2.1 Agent1.2.2 Source1.2.3 Channel1.2.4 Sink1.2.5 Event1.3 Flume拓撲結構1.4 Flume Agent內部原理1.5 Hadoop三大發行版本第2章 Flume快速入門2.1 Flume安裝地址2.2 安裝部署第3章 Flume企業開發案例3.1 監控端口數據官方案例3.2 實時讀取本地文件到HDFS案例3.3 實時讀取目錄文件到HDFS案例3.4 單數據源多出口案例(選擇器)3.5 單數據源多出口案例(Sink組)3.6 多數據源彙總案例第4章 Flume監控之Ganglia4.1 Ganglia的安裝與部署4.2 操做Flume測試監控第5章 Flume高級之自定義MySQLSource5.1 自定義Source說明5.2 自定義MySQLSource組成5.3 自定義MySQLSource步驟5.4 代碼實現5.4.1 導入pom依賴5.4.2 添加配置信息5.4.3 SQLSourceHelper5.4.4 MySQLSource5.5 測試5.5.1 Jar包準備5.5.2 配置文件準備5.5.3 MySql表準備5.5.4測試並查看結果第6章 知識擴展6.1 常見正則表達式語法6.2 練習第7章 Flume企業真實面試題(重點)7.1 你是如何實現Flume數據傳輸的監控的?7.2 Flume的Source,Sink,Channel的做用?大家Source是什麼類型?7.3 Flume的Channel Selectors7.4 Flume參數調優7.5 Flume的事務機制7.6 Flume採集數據會丟失嗎?php


第1章 Flume概述

1.1 Flume定義

  Flume(水槽) 是 Cloudera 提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統。Flume基於流式架構,靈活簡單。
  在2009年Flume被捐贈了apache軟件基金會,爲hadoop相關組件之一。尤爲近幾年隨着flume的不斷被完善以及升級版本的逐一推出,特別是flume-ng;,同時flume內部的各類組件不斷豐富,用戶在開發的過程當中使用的便利性獲得很大的改善,現已成爲apache top項目之一css

1.2 Flume組成架構

  Flume組成架構以下圖所示:html


Flume組成架構

下面咱們來詳細介紹一下Flume架構中的組件。java

1.2.1 Agent

  Agent是一個JVM進程,它以事件的形式將數據從源頭送至目的地,是Flume數據傳輸的基本單元
  Agent主要有3個部分組成,Source、Channel、Sink。mysql

1.2.2 Source

  Source是負責接收數據到Flume Agent的組件。Source組件能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec(Linux命令)、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。linux

1.2.3 Channel

  Channel是位於Source和Sink之間的緩衝區。所以,Channel容許Source和Sink運做在不一樣的速率上。Channel是線程安全的,能夠同時處理幾個Source的寫入操做和幾個Sink的讀取操做。
  Flume自帶兩種Channel:Memory Channel 和 File Channel。
  Memory Channel是內存中的隊列Memory Channel 在不須要關心數據丟失的情景下適用。若是須要關心數據丟失,那麼Memory Channel就不該該使用,由於程序死亡、機器宕機或者重啓都會致使數據丟失。
  File Channel將全部事件寫到磁盤。所以在程序關閉或機器宕機的狀況下不會丟失數據。web

1.2.4 Sink

  Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent
  Sink是徹底事務性的。在從Channel批量刪除數據以前,每一個Sink用Channel啓動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從本身的內部緩衝區刪除事件。
  Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。面試

1.2.5 Event

  傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。正則表達式

1.3 Flume拓撲結構

Flume的拓撲結構以下圖所示:
Flume Agent鏈接sql


單source,多channel、sink

Flume負載均衡

Flume Agent聚合

1.4 Flume Agent內部原理

1.5 Hadoop三大發行版本

  • Hadoop(哈道普)三大發行版本:Apache、Cloudera、Hortonworks。
      Apache 版本最原始(最基礎)的版本,對於入門學習最好。
      Cloudera 在大型互聯網企業中用的較多。(簡稱:CDH版,收費)
      Hortonworks 文檔較好。

  • 一、Apache Hadoop
    官網地址:http://hadoop.apache.org/releases.html
    下載地址:https://archive.apache.org/dist/hadoop/common/

  • 二、Cloudera Hadoop
    官網地址:https://www.cloudera.com/downloads/cdh/5-10-0.html
    下載地址:http://archive-primary.cloudera.com/cdh5/cdh/5/

    • (1)2008年成立的Cloudera是最先將Hadoop商用的公司,爲合做夥伴提供Hadoop的商用解決方案,主要是包括支持、諮詢服務、培訓。
    • (2)2009年Hadoop的創始人Doug Cutting也加盟Cloudera公司。Cloudera產品主要爲CDH,Cloudera Manager,Cloudera Support。
    • (3)CDH是Cloudera的Hadoop發行版,徹底開源,比Apache Hadoop在兼容性,安全性,穩定性上有所加強。
    • (4)Cloudera Manager是集羣的軟件分發及管理監控平臺,能夠在幾個小時內部署好一個Hadoop集羣,並對集羣的節點及服務進行實時監控。Cloudera Support便是對Hadoop的技術支持。
    • (5)Cloudera的標價爲每一年每一個節點4000美圓。Cloudera開發並貢獻了可實時處理大數據的Impala項目。
  • 三、Hortonworks Hadoop
    官網地址:https://hortonworks.com/products/data-center/hdp/
    下載地址:https://hortonworks.com/downloads/#data-platform

    • (1)2011年成立的Hortonworks是雅虎與硅谷風投公司Benchmark Capital合資組建。
    • (2)公司成立之初就吸納了大約25名至30名專門研究Hadoop的雅虎工程師,上述工程師均在2005年開始協助雅虎開發Hadoop,貢獻了Hadoop80%的代碼。
    • (3)雅虎工程副總裁、雅虎Hadoop開發團隊負責人Eric Baldeschwieler出任Hortonworks的首席執行官。
    • (4)Hortonworks的主打產品是Hortonworks Data Platform(HDP),也一樣是100%開源的產品,HDP除常見的項目外還包括了Ambari,一款開源的安裝和管理系統。
    • (5)HCatalog,一個元數據管理系統,HCatalog現已集成到Facebook開源的Hive中。Hortonworks的Stinger開創性的極大的優化了Hive項目。Hortonworks爲入門提供了一個很是好的,易於使用的沙盒。
    • (6)Hortonworks開發了不少加強特性並提交至核心主幹,這使得Apache Hadoop可以在包括Window Server和Windows Azure在內的Microsoft Windows平臺上本地運行。訂價以集羣爲基礎,每10個節點每一年爲12500美圓。

第2章 Flume快速入門

2.1 Flume安裝地址

1) Flume官網地址
  http://flume.apache.org/
2)文檔查看地址
  http://flume.apache.org/FlumeUserGuide.html
3)下載地址
  http://archive.apache.org/dist/flume/

2.2 安裝部署

1)將apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下
2)解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄下

[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/

3)修改apache-flume-1.7.0-bin的名稱爲flume

[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin flume

4)將flume/conf下的flume-env.sh.template文件修改成flume-env.sh,並配置flume-env.sh文件

[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

第3章 Flume企業開發案例

3.1 監控端口數據官方案例

1)案例需求:首先,Flume監控本機44444端口,而後經過telnet工具向本機44444端口發送消息,最後Flume將監聽的數據實時顯示在控制檯。
2)需求分析:


3)實現步驟:
1.安裝telnet工具
將rpm軟件包(xinetd-2.3.14-40.el6.x86_64.rpm、telnet-0.17-48.el6.x86_64.rpm和telnet-server-0.17-48.el6.x86_64.rpm)拷入/opt/software文件夾下面。執行RPM軟件包安裝命令:
[atguigu@hadoop102 software]$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm

2.判斷44444端口是否被佔用

[atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444

功能描述:netstat命令是一個監控TCP/IP網絡的很是有用的工具,它能夠顯示路由表、實際的網絡鏈接以及每個網絡接口設備的狀態信息。
基本語法:netstat [選項]
選項參數:
  -t或--tcp:顯示TCP傳輸協議的連線情況;
  -u或--udp:顯示UDP傳輸協議的連線情況;
  -n或--numeric:直接使用ip地址,而不經過域名服務器;
  -l或--listening:顯示監控中的服務器的Socket;
  -p或--programs:顯示正在使用Socket的程序識別碼和程序名稱;
3.建立Flume Agent配置文件flume-telnet-logger.conf
在flume目錄下建立job文件夾並進入job文件夾。

[atguigu@hadoop102 flume]$ pwd
/opt/module/flume
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/

在job文件夾下建立Flume Agent配置文件flume-telnet-logger.conf

[atguigu@hadoop102 job]$ touch flume-telnet-logger.conf

在flume-telnet-logger.conf文件中添加以下內容:

[atguigu@hadoop102 job]$ vim flume-telnet-logger.conf

添加內容以下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

注:配置文件來源於官方手冊:http://flume.apache.org/FlumeUserGuide.html

  1. 先開啓flume監聽端口
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console

參數說明:
  --conf conf/ :表示配置文件存儲在conf/目錄
  --name a1 :表示給agent起名爲a1(要與配置文件一致)
  --conf-file job/flume-telnet.conf :flume本次啓動讀取的配置文件是在job文件夾下的flume-telnet.conf文件
  -Dflume.root.logger==INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制檯日誌打印級別設置爲INFO級別。日誌級別包括:log、info、warn、error
5.使用telnet工具向本機的44444端口發送內容

[atguigu@hadoop102 ~]$ telnet localhost 44444

以下圖所示:


6.在Flume監聽頁面觀察接收數據狀況

3.2 實時讀取本地文件到HDFS案例

1)案例需求:實時監控Hive日誌,並上傳到HDFS中。(實際開發中是tomcat中產生的日誌:訂單日誌、點擊流日誌等)
2)需求分析:


3)實現步驟:
1.Flume要想將數據輸出到HDFS,必須持有Hadoop相關jar包

commons-configuration-1.6.jar
hadoop-auth-2.7.2.jar
hadoop-common-2.7.2.jar
hadoop-hdfs-2.7.2.jar
commons-io-2.4.jar
htrace-core-3.1.0-incubating.jar

拷貝到/opt/module/flume/lib文件夾下。
2.建立flume-file-hdfs.conf文件
建立文件

[atguigu@hadoop102 job]$ touch flume-file-hdfs.conf

注:要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令。因爲Hive日誌在Linux系統中,因此讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。

[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf

添加以下內容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設置每一個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小冗餘數
a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

配置文件解析:


3.執行監控配置
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

4.開啓Hadoop和Hive並操做Hive產生日誌

[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>

5.在HDFS上查看文件。

3.3 實時讀取目錄文件到HDFS案例

1)案例需求:使用Flume監聽整個目錄的文件。
2)需求分析:


3)實現步驟:
1.建立配置文件flume-dir-hdfs.conf
建立一個文件
[atguigu@hadoop102 job]$ touch flume-dir-hdfs.conf

打開文件

[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf

添加以下內容:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略全部以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#從新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0
#最小冗餘數
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

配置文件解析:

  1. 啓動監控文件夾命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf

說明: 在使用Spooling Directory Source時
  1) 不要在監控目錄中建立並持續修改文件
  2) 上傳完成的文件會以.COMPLETED結尾
  3) 被監控文件夾每500毫秒掃描一次文件變更

  1. 向upload文件夾中添加文件
    在/opt/module/flume目錄下建立upload文件夾
[atguigu@hadoop102 flume]$ mkdir upload

向upload文件夾中添加文件

[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp
[atguigu@hadoop102 upload]$ touch atguigu.log

查看數據

  1. 查看HDFS上的數據
  2. 等待1s,再次查詢upload文件夾
[atguigu@hadoop102 upload]$ pwd
/opt/module/flume/upload
[atguigu@hadoop102 upload]$ ll
總用量 0
-rw-rw-r--. 1 atguigu atguigu 0 3月   4 00:09 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 3月   4 00:09 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 3月   4 00:09 atguigu.txt.COMPLETED

3.4 單數據源多出口案例(選擇器)

單Source多Channel、Sink,以下圖所示:


1)案例需求:使用Flume-1監控文件變更,Flume-1將變更內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變更內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。
2)需求分析:

3)實現步驟:
0.準備工做
在/opt/module/flume/job目錄下建立group1文件夾
[atguigu@hadoop102 job]$ mkdir group1
[atguigu@hadoop102 job]$ cd group1/

在/opt/module/datas/目錄下建立flume3文件夾

[atguigu@hadoop102 datas]$ mkdir flume3

1.建立flume-file-flume.conf
配置1個接收日誌文件的source和2個channel、2個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。
建立配置文件並打開:

[atguigu@hadoop102 group1]$ touch flume-file-flume.conf
[atguigu@hadoop102 group1]$ vim flume-file-flume.conf

添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流複製給全部channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

注:Avro是由Hadoop創始人Doug Cutting建立的一種跟語言無關的數據序列化和RPC框架。
注:RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。
2.建立flume-flume-hdfs.conf
配置上級Flume輸出的Source,輸出是到HDFS的Sink。
建立配置文件並打開

[atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf

添加以下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0
#最小冗餘數
a2.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3.建立flume-flume-dir.conf
配置上級Flume輸出的Source,輸出是到本地目錄的Sink。
建立配置文件並打開

[atguigu@hadoop102 group1]$ touch flume-flume-dir.conf
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf

添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:輸出的本地目錄必須是已經存在的目錄,若是該目錄不存在,並不會建立新的目錄。
4.執行配置文件
分別開啓對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf

5.啓動Hadoop和Hive

[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh

[atguigu@hadoop102 hive]$ bin/hive
hive (default)>

6.檢查HDFS上數據

  1. 檢查/opt/module/datas/flume3目錄中數據
[atguigu@hadoop102 flume3]$ pwd
/opt/module/datas/flume3
[atguigu@hadoop102 flume3]$ ll
總用量 4
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 1594 3月   4 01:02 1551632490229-2
[atguigu@hadoop102 flume3]$ ll
總用量 4
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月   4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:02 1551632490229-3
[atguigu@hadoop102 flume3]$ ll
總用量 8
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月   4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu  538 3月   4 01:02 1551632490229-3
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:03 1551632490229-4
-rw-rw-r--. 1 atguigu atguigu    0 3月   4 01:03 1551632490229-5

3.5 單數據源多出口案例(Sink組)

單Source、Channel多Sink(負載均衡),以下圖所示。


1)案例需求:使用Flume-1監控文件變更,Flume-1將變更內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變更內容傳遞給Flume-3,Flume-3也負責存儲到HDFS
2)需求分析:

3)實現步驟:
0.準備工做
在/opt/module/flume/job目錄下建立group2文件夾
[atguigu@hadoop102 job]$ mkdir group2
[atguigu@hadoop102 job]$ cd group2/

1.建立flume-netcat-flume.conf
配置1個接收日誌文件的source和1個channel、2個sink,分別輸送給flume-flume-console1和flume-flume-console2。
建立配置文件並打開

[atguigu@hadoop102 group2]$ touch flume-netcat-flume.conf
[atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf

添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 配置sink組相關信息
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

注:Avro是由Hadoop創始人Doug Cutting建立的一種語言無關的數據序列化和RPC框架。
注:RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。
2.建立flume-flume-console1.conf
配置上級Flume輸出的Source,輸出是到本地控制檯。
建立配置文件並打開

[atguigu@hadoop102 group2]$ touch flume-flume-console1.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console1.conf

添加以下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3.建立flume-flume-console2.conf
配置上級Flume輸出的Source,輸出是到本地控制檯。
建立配置文件並打開

[atguigu@hadoop102 group2]$ touch flume-flume-console2.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console2.conf

添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

4.執行配置文件
分別開啓對應配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

5.使用telnet工具向本機的44444端口發送內容

$ telnet localhost 44444

6.查看Flume2及Flume3的控制檯打印日誌

3.6 多數據源彙總案例

多Source彙總數據到單Flume,以下圖所示。


1)案例需求:
hadoop103上的Flume-1監控文件/opt/module/group.log,
hadoop102上的Flume-2監控某一個端口的數據流,
Flume-1與Flume-2將數據發送給hadoop104上的Flume-3,Flume-3將最終數據打印到控制檯。
2)需求分析:

3)實現步驟:
0.準備工做
分發Flume
[atguigu@hadoop102 module]$ xsync flume

在hadoop10二、hadoop103以及hadoop104的/opt/module/flume/job目錄下建立一個group3文件夾。

[atguigu@hadoop102 job]$ mkdir group3
[atguigu@hadoop103 job]$ mkdir group3
[atguigu@hadoop104 job]$ mkdir group3

1.建立flume1-logger-flume.conf
配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。
在hadoop103上建立配置文件並打開

[atguigu@hadoop103 group3]$ touch flume1-logger-flume.conf
[atguigu@hadoop103 group3]$ vim flume1-logger-flume.conf

添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.建立flume2-netcat-flume.conf
配置Source監控端口44444數據流,配置Sink數據到下一級Flume:
在hadoop102上建立配置文件並打開

[atguigu@hadoop102 group3]$ touch flume2-netcat-flume.conf
[atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf

添加以下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

3.建立flume3-flume-logger.conf
配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制檯。
在hadoop104上建立配置文件並打開

[atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf
[atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf

添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

4.執行配置文件
分別開啓對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf

[atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf

5.在hadoop103上向/opt/module目錄下的group.log追加內容

[atguigu@hadoop103 module]$ echo 'hello' > group.log

6.在hadoop102上向44444端口發送數據

[atguigu@hadoop102 flume]$ telnet hadoop102 44444

7.在hadoop102上向44444端口發送數據

第4章 Flume監控之Ganglia

4.1 Ganglia的安裝與部署

1) 安裝httpd服務與php

[atguigu@hadoop102 flume]$ sudo yum -y install httpd php

2) 安裝其餘依賴

[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel

3) 安裝ganglia

[atguigu@hadoop102 flume]$ sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web
[atguigu@hadoop102 flume]$ sudo yum install -y ganglia-gmond

4) 修改配置文件/etc/httpd/conf.d/ganglia.conf

[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf

修改成以下的配置:

# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
  Order deny,allow
  Deny from all
  Allow from all
  # Allow from 127.0.0.1
  # Allow from ::1
  # Allow from .example.com
</Location>

5) 修改配置文件/etc/ganglia/gmetad.conf

[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf

修改成:

data_source "hadoop102" 192.168.25.102

6) 修改配置文件/etc/ganglia/gmond.conf

[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf

修改成:

cluster {
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel {
  #bind_hostname = yes # Highly recommended, soon to be default.
                       # This option tells gmond to use a source address
                       # that resolves to the machine's hostname.  Without
                       # this, the metrics may appear to come from any
                       # interface and the DNS names associated with
                       # those IPs will be used to create the RRDs.
  # mcast_join = 239.2.11.71
  host = 192.168.25.102
  port = 8649
  ttl = 1
}

udp_recv_channel {
  # mcast_join = 239.2.11.71
  port = 8649
  # bind = 239.2.11.71
  bind = 192.168.25.102
  retry_bind = true
  # Size of the UDP buffer. If you are handling lots of metrics you really
  # should bump it up to e.g. 10MB or even higher.
  # buffer = 10485760
}

7) 修改配置文件/etc/selinux/config

[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config

修改成:

# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux security policy is enforced.
#     permissive - SELinux prints warnings instead of enforcing.
#     disabled - No SELinux policy is loaded.
# SELINUX=enforcing
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
#     targeted - Targeted processes are protected,
#     mls - Multi Level Security protection.
SELINUXTYPE=targeted

尖叫提示:selinux本次生效關閉必須重啓,若是此時不想重啓,能夠臨時生效之:

[atguigu@hadoop102 flume]$ sudo setenforce 0

5) 啓動ganglia

[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start

6) 打開網頁瀏覽ganglia頁面
http://192.168.25.102/ganglia
尖叫提示:若是完成以上操做依然出現權限不足錯誤,請修改/var/lib/ganglia目錄的權限:

[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia

4.2 操做Flume測試監控

1) 修改/opt/module/flume/conf目錄下的flume-env.sh配置:

JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.25.102:8649
-Xms100m
-Xmx200m"

2) 啓動Flume任務

[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ \
--name a1 \
--conf-file job/flume-telnet-logger.conf \
-Dflume.root.logger==INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=192.168.25.102:8649

3) 發送數據觀察ganglia監測圖

[atguigu@hadoop102 flume]$ telnet localhost 44444

樣式如圖:


圖例說明:

第5章 Flume高級之自定義MySQLSource

5.1 自定義Source說明

  Source是負責接收數據到Flume Agent的組件。Source組件能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經不少,可是有時候並不能知足實際開發當中的需求,此時咱們就須要根據實際需求自定義某些Source。
  如:實時監控MySQL,從MySQL中獲取數據傳輸到HDFS或者其餘存儲框架,因此此時須要咱們本身實現MySQLSource。
  官方也提供了自定義source的接口:
  官網說明:https://flume.apache.org/FlumeDeveloperGuide.html#source

5.2 自定義MySQLSource組成

5.3 自定義MySQLSource步驟

  根據官方說明自定義MySqlSource須要繼承AbstractSource類並實現Configurable和PollableSource接口。
  實現相應方法:
    getBackOffSleepIncrement() // 暫不用
    getMaxBackOffSleepInterval() // 暫不用
    configure(Context context) // 初始化context
    process() // 獲取數據(從MySql獲取數據,業務處理比較複雜,因此咱們定義一個專門的類SQLSourceHelper來處理跟MySql的交互),封裝成Event並寫入Channel,這個方法被循環調用)
    stop() // 關閉相關的資源

5.4 代碼實現

5.4.1 導入pom依賴

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.27</version>
    </dependency>
</dependencies>

5.4.2 添加配置信息

在ClassPath下添加jdbc.properties和log4j. properties
jdbc.properties:

dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=123456

log4j. properties:

#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n

5.4.3 SQLSourceHelper

1)屬性說明:


2)方法說明:

3)代碼分析:

4)代碼實現:
package com.atguigu;

import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class SQLSourceHelper {

    private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);

    private int runQueryDelay,  // 兩次查詢的時間間隔
            startFrom,          // 開始id
            currentIndex,       // 當前id
            recordSixe = 0,     // 每次查詢返回結果的條數
            maxRow;             // 每次查詢的最大條數

    private String table,       // 要操做的表
            columnsToSelect,    // 用戶傳入的查詢的列
            customQuery,        // 用戶傳入的查詢語句
            query,              // 構建的查詢語句
            defaultCharsetResultSet;    // 編碼集

    // 上下文,用來獲取配置文件
    private Context context;

    // 爲定義的變量賦值(默認值),可在flume任務的配置文件中修改
    private static final int DEFAULT_QUERY_DELAY = 10000;
    private static final int DEFAULT_START_VALUE = 0;
    private static final int DEFAULT_MAX_ROWS = 2000;
    private static final String DEFAULT_COLUMNS_SELECT = "*";
    private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";

    private static Connection conn = null;
    private static PreparedStatement ps = null;
    private static String connectionURL, connectionUserName, connectionPassword;

    // 加載靜態資源
    static {

        Properties p = new Properties();

        try {
            p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
            connectionURL = p.getProperty("dbUrl");
            connectionUserName = p.getProperty("dbUser");
            connectionPassword = p.getProperty("dbPassword");
            Class.forName(p.getProperty("dbDriver"));

        } catch (IOException | ClassNotFoundException e) {
            LOG.error(e.toString());
        }
    }

    // 獲取JDBC鏈接
    private static Connection InitConnection(String url, String user, String pw) {
        try {

            Connection conn = DriverManager.getConnection(url, user, pw);

            if (conn == null)
                throw new SQLException();

            return conn;

        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 構造方法
    SQLSourceHelper(Context context) throws ParseException {

        // 初始化上下文
        this.context = context;

        // 有默認值參數:獲取flume任務配置文件中的參數,讀不到的採用默認值
        this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);

        this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);

        this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);

        this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);

        // 無默認值參數:獲取flume任務配置文件中的參數
        this.table = context.getString("table");

        this.customQuery = context.getString("custom.query");

        connectionURL = context.getString("connection.url");

        connectionUserName = context.getString("connection.user");

        connectionPassword = context.getString("connection.password");

        conn = InitConnection(connectionURL, connectionUserName, connectionPassword);

        // 校驗相應的配置信息,若是沒有默認值的參數也沒賦值,拋出異常
        checkMandatoryProperties();

        // 獲取當前的id
        currentIndex = getStatusDBIndex(startFrom);

        // 構建查詢語句
        query = buildQuery();
    }

    // 校驗相應的配置信息(表,查詢語句以及數據庫鏈接的參數)
    private void checkMandatoryProperties() {

        if (table == null) {
            throw new ConfigurationException("property table not set");
        }

        if (connectionURL == null) {
            throw new ConfigurationException("connection.url property not set");
        }

        if (connectionUserName == null) {
            throw new ConfigurationException("connection.user property not set");
        }

        if (connectionPassword == null) {
            throw new ConfigurationException("connection.password property not set");
        }
    }

    // 構建sql語句
    private String buildQuery() {

        String sql = "";

        // 獲取當前id
        currentIndex = getStatusDBIndex(startFrom);
        LOG.info(currentIndex + "");

        if (customQuery == null) {
            sql = "SELECT " + columnsToSelect + " FROM " + table;
        } else {
            sql = customQuery;
        }

        StringBuilder execSql = new StringBuilder(sql);

        // 以id做爲offset
        if (!sql.contains("where")) {
            execSql.append(" where ");
            execSql.append("id").append(">").append(currentIndex);

            return execSql.toString();
        } else {
            int length = execSql.toString().length();

            return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
        }
    }

    // 執行查詢
    List<List<Object>> executeQuery() {

        try {
            // 每次執行查詢時都要從新生成sql,由於id不一樣
            customQuery = buildQuery();

            // 存放結果的集合
            List<List<Object>> results = new ArrayList<>();

            if (ps == null) {
                //
                ps = conn.prepareStatement(customQuery);
            }

            ResultSet result = ps.executeQuery(customQuery);

            while (result.next()) {

                // 存放一條數據的集合(多個列)
                List<Object> row = new ArrayList<>();

                // 將返回結果放入集合
                for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
                    row.add(result.getObject(i));
                }

                results.add(row);
            }

            LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());

            return results;
        } catch (SQLException e) {
            LOG.error(e.toString());

            // 從新鏈接
            conn = InitConnection(connectionURL, connectionUserName, connectionPassword);

        }

        return null;
    }

    // 將結果集轉化爲字符串,每一條數據是一個list集合,將每個小的list集合轉化爲字符串
    List<String> getAllRows(List<List<Object>> queryResult) {

        List<String> allRows = new ArrayList<>();

        if (queryResult == null || queryResult.isEmpty())
            return allRows;

        StringBuilder row = new StringBuilder();

        for (List<Object> rawRow : queryResult) {

            Object value = null;

            for (Object aRawRow : rawRow) {

                value = aRawRow;

                if (value == null) {
                    row.append(",");
                } else {
                    row.append(aRawRow.toString()).append(",");
                }
            }

            allRows.add(row.toString());
            row = new StringBuilder();
        }

        return allRows;
    }

    // 更新offset元數據狀態,每次返回結果集後調用。必須記錄每次查詢的offset值,爲程序中斷續跑數據時使用,以id爲offset
    void updateOffset2DB(int size) {
        // 以source_tab作爲KEY,若是不存在則插入,存在則更新(每一個源表對應一條記錄)
        String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('" + this.table + "','" + (recordSixe += size)
                + "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";

        LOG.info("updateStatus Sql:" + sql);

        execSql(sql);
    }

    // 執行sql語句
    private void execSql(String sql) {

        try {
            ps = conn.prepareStatement(sql);

            LOG.info("exec::" + sql);

            ps.execute();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    // 獲取當前id的offset
    private Integer getStatusDBIndex(int startFrom) {

        // 從flume_meta表中查詢出當前的id是多少
        String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");

        if (dbIndex != null) {
            return Integer.parseInt(dbIndex);
        }

        // 若是沒有數據,則說明是第一次查詢或者數據表中尚未存入數據,返回最初傳入的值
        return startFrom;
    }

    // 查詢一條數據的執行語句(當前id)
    private String queryOne(String sql) {

        ResultSet result = null;

        try {
            ps = conn.prepareStatement(sql);
            result = ps.executeQuery();

            while (result.next()) {
                return result.getString(1);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

        return null;
    }

    // 關閉相關資源
    void close() {

        try {
            ps.close();
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    int getCurrentIndex() {
        return currentIndex;
    }

    void setCurrentIndex(int newValue) {
        currentIndex = newValue;
    }

    int getRunQueryDelay() {
        return runQueryDelay;
    }

    String getQuery() {
        return query;
    }

    String getConnectionURL() {
        return connectionURL;
    }

    private boolean isCustomQuerySet() {
        return (customQuery != null);
    }

    Context getContext() {
        return context;
    }

    public String getConnectionUserName() {
        return connectionUserName;
    }

    public String getConnectionPassword() {
        return connectionPassword;
    }

    String getDefaultCharsetResultSet() {
        return defaultCharsetResultSet;
    }
}

5.4.4 MySQLSource

代碼實現:

package com.atguigu;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class SQLSource extends AbstractSource implements ConfigurablePollableSource {

    // 打印日誌
    private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);

    // 定義sqlHelper
    private SQLSourceHelper sqlSourceHelper;

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    @Override
    public void configure(Context context) {

        try {
            // 初始化
            sqlSourceHelper = new SQLSourceHelper(context);
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Status process() throws EventDeliveryException {

        try {
            // 查詢數據表
            List<List<Object>> result = sqlSourceHelper.executeQuery();

            // 存放event的集合
            List<Event> events = new ArrayList<>();

            // 存放event頭集合
            HashMap<String, String> header = new HashMap<>();

            // 若是有返回數據,則將數據封裝爲event
            if (!result.isEmpty()) {

                List<String> allRows = sqlSourceHelper.getAllRows(result);

                Event event = null;

                for (String row : allRows) {
                    event = new SimpleEvent();
                    event.setBody(row.getBytes());
                    event.setHeaders(header);
                    events.add(event);
                }

                // 將event寫入channel
                this.getChannelProcessor().processEventBatch(events);

                // 更新數據表中的offset信息
                sqlSourceHelper.updateOffset2DB(result.size());
            }

            // 等待時長
            Thread.sleep(sqlSourceHelper.getRunQueryDelay());

            return Status.READY;
        } catch (InterruptedException e) {
            LOG.error("Error procesing row", e);

            return Status.BACKOFF;
        }
    }

    @Override
    public synchronized void stop() {

        LOG.info("Stopping sql source {} ...", getName());

        try {
            // 關閉資源
            sqlSourceHelper.close();
        } finally {
            super.stop();
        }
    }
}

5.5 測試

5.5.1 Jar包準備

1) 將MySql驅動包放入Flume的lib目錄下

[atguigu@hadoop102 flume]$ cp \
/opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \
/opt/module/flume/lib/

2) 打包項目並將Jar包放入Flume的lib目錄下

5.5.2 配置文件準備

1)建立配置文件並打開

[atguigu@hadoop102 job]$ touch mysql.conf
[atguigu@hadoop102 job]$ vim mysql.conf

2)添加以下內容

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.SQLSource  
a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource
a1.sources.r1.connection.user = root  
a1.sources.r1.connection.password = 000000  
a1.sources.r1.table = student  
a1.sources.r1.columns.to.select = *  
#a1.sources.r1.incremental.column.name = id  
#a1.sources.r1.incremental.value = 0 
a1.sources.r1.run.query.delay=5000

# Describe the sink
a1.sinks.k1.type = logger

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

5.5.3 MySql表準備

1) 建立MySqlSource數據庫

CREATE DATABASE mysqlsource;

2) 在MySqlSource數據庫下建立數據表Student和元數據表Flume_meta

CREATE TABLE `student` (
`id` int(11NOT NULL AUTO_INCREMENT,
`name` varchar(255NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255NOT NULL,
`currentIndex` varchar(255NOT NULL,
PRIMARY KEY (`source_tab`)
);

3)向數據表中添加數據

1 zhangsan
2 lisi
3 wangwu
4 zhaoliu

5.5.4測試並查看結果

1)任務執行

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 \
--conf-file job/mysql.conf -Dflume.root.logger=INFO,console

2)結果展現,以下圖所示:

第6章 知識擴展

6.1 常見正則表達式語法

6.2 練習

案例需求:
  1)flume-1監控hive.log日誌,flume-1的數據傳送給flume-2,flume-2將數據追加到本地文件,同時將數據傳輸到flume-3。
  2)flume-4監控本地另外一個本身建立的文件any.txt,並將數據傳送給flume-3。
  3)flume-3將彙總數據寫入到HDFS。
請先畫出結構圖,再開始編寫任務腳本。

第7章 Flume企業真實面試題(重點)

7.1 你是如何實現Flume數據傳輸的監控的?

  使用第三方框架Ganglia實時監控Flume。

7.2 Flume的Source,Sink,Channel的做用?大家Source是什麼類型?

一、做用
(1)Source組件是專門用來收集數據的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
(2)Channel組件對採集到的數據進行緩存,能夠存放在Memory或File中。
(3)Sink組件是用於把數據發送到目的地的組件,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。

二、我公司採用的Source類型爲:
(1)監控後臺日誌:exec
(2)監控後臺產生日誌的端口:netcat
    Exec    spooldir

7.3 Flume的Channel Selectors

7.4 Flume參數調優

1. Source
  增長 Source 個數(使用 Tair Dir Source 時可增長 FileGroups 個數)能夠增大 Source 的讀取數據的能力。例如:當某一個目錄產生的文件過多時須要將這個文件目錄拆分紅多個文件目錄,同時配置好多個 Source 以保證 Source 有足夠的能力獲取到新產生的數據。
  batchSize 參數決定 Source 一次批量運輸到 Channel 的 Event 條數,適當調大這個參數能夠提升 Source 搬運 Event 到 Channel 時的性能。

2. Channel
  type 選擇 memory 時 Channel 的性能最好,可是若是 Flume 進程意外掛掉可能會丟失數據。type 選擇 file 時 Channel 的容錯性更好,可是性能上會比 memory Channel 差。
  使用 file Channel 時 dataDirs 配置多個不一樣盤下的目錄能夠提升性能。
  Capacity 參數決定 Channel 可容納最大的 Event 條數。transactionCapacity 參數決定每次 Source 往 Channel 裏面寫的最大 Event 條數和每次 Sink 從 Channel 裏面讀的最大 Event 條數。transactionCapacity 須要大於 Source 和 Sink 的 batchSize 參數。

3. Sink
  增長 Sink 的個數能夠增長 Sink 消費 Event 的能力。Sink 也不是越多越好夠用就行,過多的 Sink 會佔用系統資源,形成系統資源沒必要要的浪費。
  batchSize 參數決定 Sink 一次批量從 Channel 讀取的 Event 條數,適當調大這個參數能夠提升 Sink 從 Channel 搬出 Event 的性能。

7.5 Flume的事務機制

  Flume的事務機制(相似數據庫的事務機制):Flume 使用兩個獨立的事務分別負責從 Soucrce 到 Channel,以及從 Channel 到 Sink 的事件傳遞。好比 spooling directory source 爲文件的每一行建立一個事件,一旦事務中全部的事件所有傳遞到 Channel 且提交成功,那麼 Soucrce 就將該文件標記爲完成。同理,事務以相似的方式處理從 Channel 到 Sink 的傳遞過程,若是由於某種緣由使得事件沒法記錄,那麼事務將會回滾。且全部的事件都會保持到 Channel 中,等待從新傳遞。
  以下圖所示:

7.6 Flume採集數據會丟失嗎?

  不會,Channel存儲能夠存儲在File中,數據傳輸自身有事務。  可是若是使用內存存儲的話,掉電可能會丟失數據。

相關文章
相關標籤/搜索