第1章 Flume概述1.1 Flume定義1.2 Flume組成架構1.2.1 Agent1.2.2 Source1.2.3 Channel1.2.4 Sink1.2.5 Event1.3 Flume拓撲結構1.4 Flume Agent內部原理1.5 Hadoop三大發行版本第2章 Flume快速入門2.1 Flume安裝地址2.2 安裝部署第3章 Flume企業開發案例3.1 監控端口數據官方案例3.2 實時讀取本地文件到HDFS案例3.3 實時讀取目錄文件到HDFS案例3.4 單數據源多出口案例(選擇器)3.5 單數據源多出口案例(Sink組)3.6 多數據源彙總案例第4章 Flume監控之Ganglia4.1 Ganglia的安裝與部署4.2 操做Flume測試監控第5章 Flume高級之自定義MySQLSource5.1 自定義Source說明5.2 自定義MySQLSource組成5.3 自定義MySQLSource步驟5.4 代碼實現5.4.1 導入pom依賴5.4.2 添加配置信息5.4.3 SQLSourceHelper5.4.4 MySQLSource5.5 測試5.5.1 Jar包準備5.5.2 配置文件準備5.5.3 MySql表準備5.5.4測試並查看結果第6章 知識擴展6.1 常見正則表達式語法6.2 練習第7章 Flume企業真實面試題(重點)7.1 你是如何實現Flume數據傳輸的監控的?7.2 Flume的Source,Sink,Channel的做用?大家Source是什麼類型?7.3 Flume的Channel Selectors7.4 Flume參數調優7.5 Flume的事務機制7.6 Flume採集數據會丟失嗎?php
Flume(水槽) 是 Cloudera 提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統
。Flume基於流式架構
,靈活簡單。
在2009年Flume被捐贈了apache軟件基金會,爲hadoop相關組件之一。尤爲近幾年隨着flume的不斷被完善以及升級版本的逐一推出,特別是flume-ng;
,同時flume內部的各類組件不斷豐富,用戶在開發的過程當中使用的便利性獲得很大的改善,現已成爲apache top項目之一。css
Flume組成架構以下圖所示:html
下面咱們來詳細介紹一下Flume架構中的組件。java
Agent是一個JVM進程,它以事件的形式將數據從源頭送至目的地,是Flume數據傳輸的基本單元
。
Agent主要有3個部分組成,Source、Channel、Sink。mysql
Source是負責接收數據到Flume Agent的組件
。Source組件能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec(Linux命令)
、jms、spooling directory
、netcat、sequence generator、syslog
、http、legacy。linux
Channel是位於Source和Sink之間的緩衝區
。所以,Channel容許Source和Sink運做在不一樣的速率上。Channel是線程安全的
,能夠同時處理幾個Source的寫入操做和幾個Sink的讀取操做。
Flume自帶兩種Channel:Memory Channel 和 File Channel。
Memory Channel是內存中的隊列
。Memory Channel 在不須要關心數據丟失的情景下適用
。若是須要關心數據丟失,那麼Memory Channel就不該該使用,由於程序死亡、機器宕機或者重啓都會致使數據丟失。
File Channel將全部事件寫到磁盤
。所以在程序關閉或機器宕機的狀況下不會丟失數據。web
Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent
。
Sink是徹底事務性的
。在從Channel批量刪除數據以前,每一個Sink用Channel啓動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從本身的內部緩衝區刪除事件。
Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。面試
傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。正則表達式
Flume的拓撲結構以下圖所示:
Flume Agent鏈接sql
Hadoop(哈道普)三大發行版本:Apache、Cloudera、Hortonworks。
Apache 版本最原始(最基礎)的版本,對於入門學習最好。
Cloudera 在大型互聯網企業中用的較多。(簡稱:CDH版,收費)
Hortonworks 文檔較好。
一、Apache Hadoop
官網地址:http://hadoop.apache.org/releases.html
下載地址:https://archive.apache.org/dist/hadoop/common/
二、Cloudera Hadoop
官網地址:https://www.cloudera.com/downloads/cdh/5-10-0.html
下載地址:http://archive-primary.cloudera.com/cdh5/cdh/5/
2009年Hadoop的創始人Doug Cutting也加盟Cloudera公司。
Cloudera產品主要爲CDH,Cloudera Manager,Cloudera Support。三、Hortonworks Hadoop
官網地址:https://hortonworks.com/products/data-center/hdp/
下載地址:https://hortonworks.com/downloads/#data-platform
公司成立之初就吸納了大約25名至30名專門研究Hadoop的雅虎工程師,上述工程師均在2005年開始協助雅虎開發Hadoop,貢獻了Hadoop80%的代碼。
1) Flume官網地址
http://flume.apache.org/
2)文檔查看地址
http://flume.apache.org/FlumeUserGuide.html
3)下載地址
http://archive.apache.org/dist/flume/
1)將apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下
2)解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄下
[atguigu@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
3)修改apache-flume-1.7.0-bin的名稱爲flume
[atguigu@hadoop102 module]$ mv apache-flume-1.7.0-bin flume
4)將flume/conf下的flume-env.sh.template文件修改成flume-env.sh,並配置flume-env.sh文件
[atguigu@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[atguigu@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
1)案例需求:首先,Flume監控本機44444端口,而後經過telnet工具向本機44444端口發送消息,最後Flume將監聽的數據實時顯示在控制檯。
2)需求分析:
[atguigu@hadoop102 software]$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm
[atguigu@hadoop102 software]$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm
2.判斷44444端口是否被佔用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -tunlp | grep 44444
功能描述:netstat命令是一個監控TCP/IP網絡的很是有用的工具,它能夠顯示路由表、實際的網絡鏈接以及每個網絡接口設備的狀態信息。
基本語法:netstat [選項]
選項參數:
-t或--tcp:顯示TCP傳輸協議的連線情況;
-u或--udp:顯示UDP傳輸協議的連線情況;
-n或--numeric:直接使用ip地址,而不經過域名服務器;
-l或--listening:顯示監控中的服務器的Socket;
-p或--programs:顯示正在使用Socket的程序識別碼和程序名稱;
3.建立Flume Agent配置文件flume-telnet-logger.conf
在flume目錄下建立job文件夾並進入job文件夾。
[atguigu@hadoop102 flume]$ pwd
/opt/module/flume
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/
在job文件夾下建立Flume Agent配置文件flume-telnet-logger.conf
[atguigu@hadoop102 job]$ touch flume-telnet-logger.conf
在flume-telnet-logger.conf文件中添加以下內容:
[atguigu@hadoop102 job]$ vim flume-telnet-logger.conf
添加內容以下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件來源於官方手冊:http://flume.apache.org/FlumeUserGuide.html
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
參數說明:
--conf conf/ :表示配置文件存儲在conf/目錄
--name a1 :表示給agent起名爲a1(要與配置文件一致)
--conf-file job/flume-telnet.conf :flume本次啓動讀取的配置文件是在job文件夾下的flume-telnet.conf文件
-Dflume.root.logger==INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制檯日誌打印級別設置爲INFO級別。日誌級別包括:log、info、warn、error
5.使用telnet工具向本機的44444端口發送內容
[atguigu@hadoop102 ~]$ telnet localhost 44444
以下圖所示:
1)案例需求:實時監控Hive日誌,並上傳到HDFS中。(實際開發中是tomcat中產生的日誌:訂單日誌、點擊流日誌等)
2)需求分析:
將
commons-configuration-1.6.jar
hadoop-auth-2.7.2.jar
hadoop-common-2.7.2.jar
hadoop-hdfs-2.7.2.jar
commons-io-2.4.jar
htrace-core-3.1.0-incubating.jar
拷貝到/opt/module/flume/lib文件夾下。
2.建立flume-file-hdfs.conf文件
建立文件
[atguigu@hadoop102 job]$ touch flume-file-hdfs.conf
注:
要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令。因爲Hive日誌在Linux系統中,因此讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
添加以下內容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設置每一個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0
#最小冗餘數
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
配置文件解析:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
4.開啓Hadoop和Hive並操做Hive產生日誌
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
5.在HDFS上查看文件。
1)案例需求:使用Flume監聽整個目錄的文件。
2)需求分析:
[atguigu@hadoop102 job]$ touch flume-dir-hdfs.conf
打開文件
[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
添加以下內容:
# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略全部以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#從新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0
#最小冗餘數
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
配置文件解析:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf
說明:
在使用Spooling Directory Source時
1) 不要在監控目錄中建立並持續修改文件
2) 上傳完成的文件會以.COMPLETED結尾
3) 被監控文件夾每500毫秒掃描一次文件變更
[atguigu@hadoop102 flume]$ mkdir upload
向upload文件夾中添加文件
[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp
[atguigu@hadoop102 upload]$ touch atguigu.log
查看數據
[atguigu@hadoop102 upload]$ pwd
/opt/module/flume/upload
[atguigu@hadoop102 upload]$ ll
總用量 0
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.log.COMPLETED
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.tmp
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 00:09 atguigu.txt.COMPLETED
單Source多Channel、Sink,以下圖所示:
[atguigu@hadoop102 job]$ mkdir group1
[atguigu@hadoop102 job]$ cd group1/
在/opt/module/datas/目錄下建立flume3文件夾
[atguigu@hadoop102 datas]$ mkdir flume3
1.建立flume-file-flume.conf
配置1個接收日誌文件的source和2個channel、2個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。
建立配置文件並打開:
[atguigu@hadoop102 group1]$ touch flume-file-flume.conf
[atguigu@hadoop102 group1]$ vim flume-file-flume.conf
添加以下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流複製給全部channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
注:
Avro是由Hadoop創始人Doug Cutting建立的一種跟語言無關的數據序列化和RPC框架。注:
RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。
2.建立flume-flume-hdfs.conf
配置上級Flume輸出的Source,輸出是到HDFS的Sink。
建立配置文件並打開
[atguigu@hadoop102 group1]$ touch flume-flume-hdfs.conf
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
添加以下內容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0
#最小冗餘數
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume-flume-dir.conf
配置上級Flume輸出的Source,輸出是到本地目錄的Sink。
建立配置文件並打開
[atguigu@hadoop102 group1]$ touch flume-flume-dir.conf
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
添加以下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:
輸出的本地目錄必須是已經存在的目錄,若是該目錄不存在,並不會建立新的目錄。
4.執行配置文件
分別開啓對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf
5.啓動Hadoop和Hive
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
6.檢查HDFS上數據
[atguigu@hadoop102 flume3]$ pwd
/opt/module/datas/flume3
[atguigu@hadoop102 flume3]$ ll
總用量 4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 1594 3月 4 01:02 1551632490229-2
[atguigu@hadoop102 flume3]$ ll
總用量 4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月 4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:02 1551632490229-3
[atguigu@hadoop102 flume3]$ ll
總用量 8
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:01 1551632490229-1
-rw-rw-r--. 1 atguigu atguigu 3808 3月 4 01:02 1551632490229-2
-rw-rw-r--. 1 atguigu atguigu 538 3月 4 01:02 1551632490229-3
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:03 1551632490229-4
-rw-rw-r--. 1 atguigu atguigu 0 3月 4 01:03 1551632490229-5
單Source、Channel多Sink(負載均衡),以下圖所示。
[atguigu@hadoop102 job]$ mkdir group2
[atguigu@hadoop102 job]$ cd group2/
1.建立flume-netcat-flume.conf
配置1個接收日誌文件的source和1個channel、2個sink,分別輸送給flume-flume-console1和flume-flume-console2。
建立配置文件並打開
[atguigu@hadoop102 group2]$ touch flume-netcat-flume.conf
[atguigu@hadoop102 group2]$ vim flume-netcat-flume.conf
添加以下內容:
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 配置sink組相關信息
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
注:
Avro是由Hadoop創始人Doug Cutting建立的一種語言無關的數據序列化和RPC框架。注:
RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。
2.建立flume-flume-console1.conf
配置上級Flume輸出的Source,輸出是到本地控制檯。
建立配置文件並打開
[atguigu@hadoop102 group2]$ touch flume-flume-console1.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console1.conf
添加以下內容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = logger
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume-flume-console2.conf
配置上級Flume輸出的Source,輸出是到本地控制檯。
建立配置文件並打開
[atguigu@hadoop102 group2]$ touch flume-flume-console2.conf
[atguigu@hadoop102 group2]$ vim flume-flume-console2.conf
添加以下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
4.執行配置文件
分別開啓對應配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf
5.使用telnet工具向本機的44444端口發送內容
$ telnet localhost 44444
6.查看Flume2及Flume3的控制檯打印日誌
多Source彙總數據到單Flume,以下圖所示。
[atguigu@hadoop102 module]$ xsync flume
在hadoop10二、hadoop103以及hadoop104的/opt/module/flume/job目錄下建立一個group3文件夾。
[atguigu@hadoop102 job]$ mkdir group3
[atguigu@hadoop103 job]$ mkdir group3
[atguigu@hadoop104 job]$ mkdir group3
1.建立flume1-logger-flume.conf
配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。
在hadoop103上建立配置文件並打開
[atguigu@hadoop103 group3]$ touch flume1-logger-flume.conf
[atguigu@hadoop103 group3]$ vim flume1-logger-flume.conf
添加以下內容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2.建立flume2-netcat-flume.conf
配置Source監控端口44444數據流,配置Sink數據到下一級Flume:
在hadoop102上建立配置文件並打開
[atguigu@hadoop102 group3]$ touch flume2-netcat-flume.conf
[atguigu@hadoop102 group3]$ vim flume2-netcat-flume.conf
添加以下內容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444
# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3.建立flume3-flume-logger.conf
配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制檯。
在hadoop104上建立配置文件並打開
[atguigu@hadoop104 group3]$ touch flume3-flume-logger.conf
[atguigu@hadoop104 group3]$ vim flume3-flume-logger.conf
添加以下內容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141
# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger
# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4.執行配置文件
分別開啓對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。
[atguigu@hadoop104 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume2-netcat-flume.conf
[atguigu@hadoop103 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume1-logger-flume.conf
5.在hadoop103上向/opt/module目錄下的group.log追加內容
[atguigu@hadoop103 module]$ echo 'hello' > group.log
6.在hadoop102上向44444端口發送數據
[atguigu@hadoop102 flume]$ telnet hadoop102 44444
7.在hadoop102上向44444端口發送數據
1) 安裝httpd服務與php
[atguigu@hadoop102 flume]$ sudo yum -y install httpd php
2) 安裝其餘依賴
[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel
3) 安裝ganglia
[atguigu@hadoop102 flume]$ sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-gmetad
[atguigu@hadoop102 flume]$ sudo yum -y install ganglia-web
[atguigu@hadoop102 flume]$ sudo yum install -y ganglia-gmond
4) 修改配置文件/etc/httpd/conf.d/ganglia.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf
修改成以下的配置:
# Ganglia monitoring system php web frontend
Alias /ganglia /usr/share/ganglia
<Location /ganglia>
Order deny,allow
Deny from all
Allow from all
# Allow from 127.0.0.1
# Allow from ::1
# Allow from .example.com
</Location>
5) 修改配置文件/etc/ganglia/gmetad.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf
修改成:
data_source "hadoop102" 192.168.25.102
6) 修改配置文件/etc/ganglia/gmond.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmond.conf
修改成:
cluster {
name = "hadoop102"
owner = "unspecified"
latlong = "unspecified"
url = "unspecified"
}
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
# mcast_join = 239.2.11.71
host = 192.168.25.102
port = 8649
ttl = 1
}
udp_recv_channel {
# mcast_join = 239.2.11.71
port = 8649
# bind = 239.2.11.71
bind = 192.168.25.102
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}
7) 修改配置文件/etc/selinux/config
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
修改成:
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
# SELINUX=enforcing
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
# targeted - Targeted processes are protected,
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
尖叫提示:
selinux本次生效關閉必須重啓,若是此時不想重啓,能夠臨時生效之:
[atguigu@hadoop102 flume]$ sudo setenforce 0
5) 啓動ganglia
[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start
6) 打開網頁瀏覽ganglia頁面
http://192.168.25.102/ganglia尖叫提示:
若是完成以上操做依然出現權限不足錯誤,請修改/var/lib/ganglia目錄的權限:
[atguigu@hadoop102 flume]$ sudo chmod -R 777 /var/lib/ganglia
1) 修改/opt/module/flume/conf目錄下的flume-env.sh配置:
JAVA_OPTS="-Dflume.monitoring.type=ganglia
-Dflume.monitoring.hosts=192.168.25.102:8649
-Xms100m
-Xmx200m"
2) 啓動Flume任務
[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ \
--name a1 \
--conf-file job/flume-telnet-logger.conf \
-Dflume.root.logger==INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=192.168.25.102:8649
3) 發送數據觀察ganglia監測圖
[atguigu@hadoop102 flume]$ telnet localhost 44444
樣式如圖:
Source是負責接收數據到Flume Agent的組件
。Source組件能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec
、jms、spooling directory
、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經不少,可是有時候並不能知足實際開發當中的需求,此時咱們就須要根據實際需求自定義某些Source。
如:實時監控MySQL,從MySQL中獲取數據傳輸到HDFS或者其餘存儲框架,因此此時須要咱們本身實現MySQLSource。
官方也提供了自定義source的接口:
官網說明:https://flume.apache.org/FlumeDeveloperGuide.html#source
根據官方說明自定義MySqlSource須要繼承AbstractSource類並實現Configurable和PollableSource接口。
實現相應方法:
getBackOffSleepIncrement() // 暫不用
getMaxBackOffSleepInterval() // 暫不用
configure(Context context) // 初始化context
process() // 獲取數據(從MySql獲取數據,業務處理比較複雜,因此咱們定義一個專門的類SQLSourceHelper來處理跟MySql的交互),封裝成Event並寫入Channel,這個方法被循環調用)
stop() // 關閉相關的資源
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>
在ClassPath下添加jdbc.properties和log4j. properties
jdbc.properties:
dbDriver=com.mysql.jdbc.Driver
dbUrl=jdbc:mysql://hadoop102:3306/mysqlsource?useUnicode=true&characterEncoding=utf-8
dbUser=root
dbPassword=123456
log4j. properties:
#--------console-----------
log4j.rootLogger=info,myconsole,myfile
log4j.appender.myconsole=org.apache.log4j.ConsoleAppender
log4j.appender.myconsole.layout=org.apache.log4j.SimpleLayout
#log4j.appender.myconsole.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
#log4j.rootLogger=error,myfile
log4j.appender.myfile=org.apache.log4j.DailyRollingFileAppender
log4j.appender.myfile.File=/tmp/flume.log
log4j.appender.myfile.layout=org.apache.log4j.PatternLayout
log4j.appender.myfile.layout.ConversionPattern =%d [%t] %-5p [%c] - %m%n
1)屬性說明:
package com.atguigu;
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);
private int runQueryDelay, // 兩次查詢的時間間隔
startFrom, // 開始id
currentIndex, // 當前id
recordSixe = 0, // 每次查詢返回結果的條數
maxRow; // 每次查詢的最大條數
private String table, // 要操做的表
columnsToSelect, // 用戶傳入的查詢的列
customQuery, // 用戶傳入的查詢語句
query, // 構建的查詢語句
defaultCharsetResultSet; // 編碼集
// 上下文,用來獲取配置文件
private Context context;
// 爲定義的變量賦值(默認值),可在flume任務的配置文件中修改
private static final int DEFAULT_QUERY_DELAY = 10000;
private static final int DEFAULT_START_VALUE = 0;
private static final int DEFAULT_MAX_ROWS = 2000;
private static final String DEFAULT_COLUMNS_SELECT = "*";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static Connection conn = null;
private static PreparedStatement ps = null;
private static String connectionURL, connectionUserName, connectionPassword;
// 加載靜態資源
static {
Properties p = new Properties();
try {
p.load(SQLSourceHelper.class.getClassLoader().getResourceAsStream("jdbc.properties"));
connectionURL = p.getProperty("dbUrl");
connectionUserName = p.getProperty("dbUser");
connectionPassword = p.getProperty("dbPassword");
Class.forName(p.getProperty("dbDriver"));
} catch (IOException | ClassNotFoundException e) {
LOG.error(e.toString());
}
}
// 獲取JDBC鏈接
private static Connection InitConnection(String url, String user, String pw) {
try {
Connection conn = DriverManager.getConnection(url, user, pw);
if (conn == null)
throw new SQLException();
return conn;
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// 構造方法
SQLSourceHelper(Context context) throws ParseException {
// 初始化上下文
this.context = context;
// 有默認值參數:獲取flume任務配置文件中的參數,讀不到的採用默認值
this.columnsToSelect = context.getString("columns.to.select", DEFAULT_COLUMNS_SELECT);
this.runQueryDelay = context.getInteger("run.query.delay", DEFAULT_QUERY_DELAY);
this.startFrom = context.getInteger("start.from", DEFAULT_START_VALUE);
this.defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
// 無默認值參數:獲取flume任務配置文件中的參數
this.table = context.getString("table");
this.customQuery = context.getString("custom.query");
connectionURL = context.getString("connection.url");
connectionUserName = context.getString("connection.user");
connectionPassword = context.getString("connection.password");
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
// 校驗相應的配置信息,若是沒有默認值的參數也沒賦值,拋出異常
checkMandatoryProperties();
// 獲取當前的id
currentIndex = getStatusDBIndex(startFrom);
// 構建查詢語句
query = buildQuery();
}
// 校驗相應的配置信息(表,查詢語句以及數據庫鏈接的參數)
private void checkMandatoryProperties() {
if (table == null) {
throw new ConfigurationException("property table not set");
}
if (connectionURL == null) {
throw new ConfigurationException("connection.url property not set");
}
if (connectionUserName == null) {
throw new ConfigurationException("connection.user property not set");
}
if (connectionPassword == null) {
throw new ConfigurationException("connection.password property not set");
}
}
// 構建sql語句
private String buildQuery() {
String sql = "";
// 獲取當前id
currentIndex = getStatusDBIndex(startFrom);
LOG.info(currentIndex + "");
if (customQuery == null) {
sql = "SELECT " + columnsToSelect + " FROM " + table;
} else {
sql = customQuery;
}
StringBuilder execSql = new StringBuilder(sql);
// 以id做爲offset
if (!sql.contains("where")) {
execSql.append(" where ");
execSql.append("id").append(">").append(currentIndex);
return execSql.toString();
} else {
int length = execSql.toString().length();
return execSql.toString().substring(0, length - String.valueOf(currentIndex).length()) + currentIndex;
}
}
// 執行查詢
List<List<Object>> executeQuery() {
try {
// 每次執行查詢時都要從新生成sql,由於id不一樣
customQuery = buildQuery();
// 存放結果的集合
List<List<Object>> results = new ArrayList<>();
if (ps == null) {
//
ps = conn.prepareStatement(customQuery);
}
ResultSet result = ps.executeQuery(customQuery);
while (result.next()) {
// 存放一條數據的集合(多個列)
List<Object> row = new ArrayList<>();
// 將返回結果放入集合
for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
row.add(result.getObject(i));
}
results.add(row);
}
LOG.info("execSql:" + customQuery + "\nresultSize:" + results.size());
return results;
} catch (SQLException e) {
LOG.error(e.toString());
// 從新鏈接
conn = InitConnection(connectionURL, connectionUserName, connectionPassword);
}
return null;
}
// 將結果集轉化爲字符串,每一條數據是一個list集合,將每個小的list集合轉化爲字符串
List<String> getAllRows(List<List<Object>> queryResult) {
List<String> allRows = new ArrayList<>();
if (queryResult == null || queryResult.isEmpty())
return allRows;
StringBuilder row = new StringBuilder();
for (List<Object> rawRow : queryResult) {
Object value = null;
for (Object aRawRow : rawRow) {
value = aRawRow;
if (value == null) {
row.append(",");
} else {
row.append(aRawRow.toString()).append(",");
}
}
allRows.add(row.toString());
row = new StringBuilder();
}
return allRows;
}
// 更新offset元數據狀態,每次返回結果集後調用。必須記錄每次查詢的offset值,爲程序中斷續跑數據時使用,以id爲offset
void updateOffset2DB(int size) {
// 以source_tab作爲KEY,若是不存在則插入,存在則更新(每一個源表對應一條記錄)
String sql = "insert into flume_meta(source_tab,currentIndex) VALUES('" + this.table + "','" + (recordSixe += size)
+ "') on DUPLICATE key update source_tab=values(source_tab),currentIndex=values(currentIndex)";
LOG.info("updateStatus Sql:" + sql);
execSql(sql);
}
// 執行sql語句
private void execSql(String sql) {
try {
ps = conn.prepareStatement(sql);
LOG.info("exec::" + sql);
ps.execute();
} catch (SQLException e) {
e.printStackTrace();
}
}
// 獲取當前id的offset
private Integer getStatusDBIndex(int startFrom) {
// 從flume_meta表中查詢出當前的id是多少
String dbIndex = queryOne("select currentIndex from flume_meta where source_tab='" + table + "'");
if (dbIndex != null) {
return Integer.parseInt(dbIndex);
}
// 若是沒有數據,則說明是第一次查詢或者數據表中尚未存入數據,返回最初傳入的值
return startFrom;
}
// 查詢一條數據的執行語句(當前id)
private String queryOne(String sql) {
ResultSet result = null;
try {
ps = conn.prepareStatement(sql);
result = ps.executeQuery();
while (result.next()) {
return result.getString(1);
}
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
// 關閉相關資源
void close() {
try {
ps.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
int getCurrentIndex() {
return currentIndex;
}
void setCurrentIndex(int newValue) {
currentIndex = newValue;
}
int getRunQueryDelay() {
return runQueryDelay;
}
String getQuery() {
return query;
}
String getConnectionURL() {
return connectionURL;
}
private boolean isCustomQuerySet() {
return (customQuery != null);
}
Context getContext() {
return context;
}
public String getConnectionUserName() {
return connectionUserName;
}
public String getConnectionPassword() {
return connectionPassword;
}
String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}
}
代碼實現:
package com.atguigu;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class SQLSource extends AbstractSource implements Configurable, PollableSource {
// 打印日誌
private static final Logger LOG = LoggerFactory.getLogger(SQLSource.class);
// 定義sqlHelper
private SQLSourceHelper sqlSourceHelper;
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public void configure(Context context) {
try {
// 初始化
sqlSourceHelper = new SQLSourceHelper(context);
} catch (ParseException e) {
e.printStackTrace();
}
}
@Override
public Status process() throws EventDeliveryException {
try {
// 查詢數據表
List<List<Object>> result = sqlSourceHelper.executeQuery();
// 存放event的集合
List<Event> events = new ArrayList<>();
// 存放event頭集合
HashMap<String, String> header = new HashMap<>();
// 若是有返回數據,則將數據封裝爲event
if (!result.isEmpty()) {
List<String> allRows = sqlSourceHelper.getAllRows(result);
Event event = null;
for (String row : allRows) {
event = new SimpleEvent();
event.setBody(row.getBytes());
event.setHeaders(header);
events.add(event);
}
// 將event寫入channel
this.getChannelProcessor().processEventBatch(events);
// 更新數據表中的offset信息
sqlSourceHelper.updateOffset2DB(result.size());
}
// 等待時長
Thread.sleep(sqlSourceHelper.getRunQueryDelay());
return Status.READY;
} catch (InterruptedException e) {
LOG.error("Error procesing row", e);
return Status.BACKOFF;
}
}
@Override
public synchronized void stop() {
LOG.info("Stopping sql source {} ...", getName());
try {
// 關閉資源
sqlSourceHelper.close();
} finally {
super.stop();
}
}
}
1) 將MySql驅動包放入Flume的lib目錄下
[atguigu@hadoop102 flume]$ cp \
/opt/sorfware/mysql-libs/mysql-connector-java-5.1.27/mysql-connector-java-5.1.27-bin.jar \
/opt/module/flume/lib/
2) 打包項目並將Jar包放入Flume的lib目錄下
1)建立配置文件並打開
[atguigu@hadoop102 job]$ touch mysql.conf
[atguigu@hadoop102 job]$ vim mysql.conf
2)添加以下內容
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.SQLSource
a1.sources.r1.connection.url = jdbc:mysql://192.168.9.102:3306/mysqlsource
a1.sources.r1.connection.user = root
a1.sources.r1.connection.password = 000000
a1.sources.r1.table = student
a1.sources.r1.columns.to.select = *
#a1.sources.r1.incremental.column.name = id
#a1.sources.r1.incremental.value = 0
a1.sources.r1.run.query.delay=5000
# Describe the sink
a1.sinks.k1.type = logger
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1) 建立MySqlSource數據庫
CREATE DATABASE mysqlsource;
2) 在MySqlSource數據庫下建立數據表Student和元數據表Flume_meta
CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `flume_meta` (
`source_tab` varchar(255) NOT NULL,
`currentIndex` varchar(255) NOT NULL,
PRIMARY KEY (`source_tab`)
);
3)向數據表中添加數據
1 zhangsan
2 lisi
3 wangwu
4 zhaoliu
1)任務執行
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 \
--conf-file job/mysql.conf -Dflume.root.logger=INFO,console
2)結果展現,以下圖所示:
案例需求:
1)flume-1監控hive.log日誌,flume-1的數據傳送給flume-2,flume-2將數據追加到本地文件,同時將數據傳輸到flume-3。
2)flume-4監控本地另外一個本身建立的文件any.txt,並將數據傳送給flume-3。
3)flume-3將彙總數據寫入到HDFS。
請先畫出結構圖,再開始編寫任務腳本。
使用第三方框架Ganglia實時監控Flume。
一、做用
(1)Source組件是專門用來收集數據
的,能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。
(2)Channel組件對採集到的數據進行緩存
,能夠存放在Memory或File中。
(3)Sink組件是用於把數據發送到目的地的組件
,目的地包括Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定義。
二、我公司採用的Source類型爲:
(1)監控後臺日誌:exec
(2)監控後臺產生日誌的端口:netcat
Exec spooldir
1. Source
增長 Source 個數(使用 Tair Dir Source 時可增長 FileGroups 個數)能夠增大 Source 的讀取數據的能力。例如:當某一個目錄產生的文件過多時須要將這個文件目錄拆分紅多個文件目錄,同時配置好多個 Source 以保證 Source 有足夠的能力獲取到新產生的數據。
batchSize 參數決定 Source 一次批量運輸到 Channel 的 Event 條數,適當調大這個參數能夠提升 Source 搬運 Event 到 Channel 時的性能。
2. Channel
type 選擇 memory 時 Channel 的性能
最好,可是若是 Flume 進程意外掛掉可能會丟失數據。type 選擇 file 時 Channel 的容錯性
更好,可是性能上會比 memory Channel 差。
使用 file Channel 時 dataDirs 配置多個不一樣盤下的目錄能夠提升性能。
Capacity 參數決定 Channel 可容納最大的 Event 條數。transactionCapacity 參數決定每次 Source 往 Channel 裏面寫的最大 Event 條數和每次 Sink 從 Channel 裏面讀的最大 Event 條數。transactionCapacity 須要大於 Source 和 Sink 的 batchSize 參數。
3. Sink
增長 Sink 的個數能夠增長 Sink 消費 Event 的能力。Sink 也不是越多越好夠用就行,過多的 Sink 會佔用系統資源,形成系統資源沒必要要的浪費。
batchSize 參數決定 Sink 一次批量從 Channel 讀取的 Event 條數,適當調大這個參數能夠提升 Sink 從 Channel 搬出 Event 的性能。
Flume的事務機制(相似數據庫的事務機制):Flume 使用兩個獨立的事務分別負責從 Soucrce 到 Channel,以及從 Channel 到 Sink 的事件傳遞。好比 spooling directory source 爲文件的每一行建立一個事件,一旦事務中全部的事件所有傳遞到 Channel 且提交成功,那麼 Soucrce 就將該文件標記爲完成。同理,事務以相似的方式處理從 Channel 到 Sink 的傳遞過程,若是由於某種緣由使得事件沒法記錄,那麼事務將會回滾。且全部的事件都會保持到 Channel 中,等待從新傳遞。
以下圖所示:
不會,Channel存儲能夠存儲在File中,數據傳輸自身有事務。 可是若是使用內存存儲的話,掉電可能會丟失數據。