Flume

 

概述

Flume是Cloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統。Flume基於流式架構,靈活簡單。php

主要做用:實時讀取服務器本地磁盤數據,將數據寫入HDFS;html

優勢:前端

  1. 能夠和任意存儲進程集成。
  2. 輸入的的數據速率大於寫入目的存儲的速率(讀寫速率不一樣步),flume會進行緩衝,減少hdfs的壓力。
  3. flume中的事務基於channel,使用了兩個事務模型(sender + receiver),確保消息被可靠發送。

Flume使用兩個獨立的事務分別負責從soucrce到channel,以及從channel到sink的事件傳遞。一旦事務中全部的數據所有成功提交到channel,那麼source才認爲該數據讀取完成。同理,只有成功被sink寫出去的數據,纔會從channel中移除;失敗後就從新提交;java

組成:Agent 由 source+channel+sink構成;linux

source是數據來源的抽象,sink是數據去向的抽象;web

Source
Source是負責接收數據到Flume Agent的組件。Source組件能夠處理各類類型、各類格式的日誌數據
數據輸入端輸入類型:spooling directory(spooldir)文件夾裏邊的數據不停的滾動、exec 命令的執行結果被採集
syslog系統日誌、avro上一層的flume、netcat網絡端傳輸的數據shell


Channel
Channel是位於Source和Sink之間的緩衝區。所以,Channel容許Source和Sink運做在不一樣的速率上。Channel是線程安全的,能夠同時處理幾個Source的寫入操做和幾個Sink的讀取操做。
Flume自帶兩種ChannelMemory ChannelFile Channel
Memory Channel是內存中的隊列。Memory Channel在不須要關心數據丟失的情景下適用。若是須要關心數據丟失,那麼Memory Channel就不該該使用,由於程序死亡、機器宕機或者重啓都會致使數據丟失。
File Channel將全部事件寫到磁盤。所以在程序關閉或機器宕機的狀況下不會丟失數據。apache

 Channel選擇器是決定Source接收的一個特定事件寫入哪些Channel的組件,它們告知Channel處理器,而後由其將事件寫入到每一個Channel。vim

Channel Selector有兩種類型:Replicating Channel Selector(default,會把全部的數據發給全部的Channel)和Multiplexing Chanell Selector(選擇把哪一個數據發到哪一個channel)和自定義選擇器瀏覽器

Source 發送的 Event 經過 Channel 選擇器來選擇以哪一種方式寫入到 Channel 中,Flume 提供三種類型 Channel 選擇器,分別是複製、複用和自定義選擇器。

  1. 複製選擇器: 一個 Source 以複製的方式將一個 Event 同時寫入到多個 Channel 中,不一樣的 Sink 能夠從不一樣的 Channel 中獲取相同的 Event,好比一份日誌數據同時寫 Kafka 和 HDFS,一個 Event 同時寫入兩個 Channel,而後不一樣類型的 Sink 發送到不一樣的外部存儲。
  •  

    該選擇器複製每一個事件到經過Source的channels參數所指定的全部的Channels中。複製Channel選擇器還有一個可選參數optional,該參數是空格分隔的channel名字列表。此參數指定的全部channel都認爲是可選的,因此若是事件寫入這些channel時,如有失敗發生,會忽略。而寫入其餘channel失敗時會拋出異常。

  2. (多路)複用選擇器: 須要和攔截器配合使用,根據 Event 的頭信息中不一樣鍵值數據來判斷 Event 應該寫入哪一個 Channel 中。

還有一種是kafka channel,它是沒有sink;

   3. 自定義選擇器

Sink

數據去向常見的目的地有:HDFS、Kafkalogger(記錄INFO級別的日誌)avro(下一層的Flume)、File、Hbase、solr、ipc、thrift自定義等
Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent。
Sink是徹底事務性的。在從Channel批量刪除數據以前,每一個Sink用Channel啓動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從本身的內部緩衝區刪除事件。

Sink groups容許組織多個sink到一個實體上。 Sink processors(處理器)可以提供在組內全部Sink之間實現負載均衡的能力,並且在失敗的狀況下可以進行故障轉移從一個Sink到另外一個Sink。簡單的說就是一個source 對應一個Sinkgroups,即多個sink,這裏實際上覆用/複製狀況差很少,只是這裏考慮的是可靠性與性能,即故障轉移與負載均衡的設置。

DefaultSink Processor 接收單一的Sink,不強制用戶爲Sink建立Processor
FailoverSink Processor故障轉移處理器會經過配置維護了一個優先級列表。保證每個有效的事件都會被處理。
工做原理是將連續失敗sink分配到一個池中,在那裏被分配一個冷凍期,在這個冷凍期裏,這個sink不會作任何事。一旦sink成功發送一個event,sink將被還原到live 池中。
Load balancing Processor負載均衡處理器提供在多個Sink之間負載平衡的能力。實現支持經過① round_robin(輪詢)或者② random(隨機)參數來實現負載分發,默認狀況下使用round_robin

 

事務

Put事務流程:

doPut將批數據先寫入臨時緩衝區putList; doCommit:檢查channel內存隊列是否足夠合併; doRollback:channel內存隊列空間不足,回滾數據;

嘗試put先把數據put到putList裏邊,而後commit提交,查看channel中事務是否提交成功,若是都提交成功了就把這個事件從putList中拿出來;若是失敗就重寫提交,rollTback到putList;

Take事務:

doTake先將數據取到臨時緩衝區takeList; doCommit若是數據所有發送成功,則清除臨時緩衝區takeList; doRollback數據發送過程當中若是出現異常,rollback將臨時緩存takeList中的數據歸還給channel內存隊列;

拉取事件到takeList中,嘗試提交,若是提交成功就把takeList中數據清除掉;若是提交失敗就重寫提交,返回到channel後重寫提交;

這種事務:flume有可能有重複的數據;

Event

傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。  Event由可選的header和載有數據的一個byte array 構成。Header是容納了key-value字符串對的HashMap。 

攔截器(interceptor)
攔截器是簡單插件式組件,設置在Source和Source寫入數據的Channel之間。每一個攔截器實例只處理同一個Source接收到的事件。
由於攔截器必須在事件寫入channel以前完成轉換操做,只有當攔截器已成功轉換事件後,channel(和任何其餘可能產生超時的source)纔會響應發送事件的客戶端或sink。

Flume官方提供了一些經常使用的攔截器,也能夠自定義攔截器對日誌進行處理。自定義攔截器只需如下幾步:

  •     使用的Flume版本爲:apache-flume-1.6.0

實現org.apache.flume.interceptor.Interceptor接口

Flume拓撲結構

 

① 串聯:channel多,但flume層數不宜過多;這種模式是將多個flume給順序鏈接起來了,從最初的source開始到最終sink傳送的目的存儲系統。此模式不建議橋接過多的flume數量, flume數量過多不只會影響傳輸速率,並且一旦傳輸過程當中某個節點flume宕機,會影響整個傳輸系統。

 

② 單source,多channel、sink; 一個channel對應多個sink; 多個channel對應多個sink;

            ---->sink1         ---->channel1 --->sink1

單source ---> channel----->sink2                 source

           ----->sink3          ------>channel2---->sink2

Flume支持將事件流向一個或者多個目的地。這種模式將數據源複製到多個channel中,每一個channel都有相同的數據,sink能夠選擇傳送的不一樣的目的地。

 

③ 負載均衡  Flume支持使用將多個sink邏輯上分到一個sink組,flume將數據發送到不一樣的sink,主要解決負載均衡和故障轉移問題。

負載均衡 :並排的三個channel都是輪詢,好處是增大流量而且保證數據的安全;(一個掛了,三個不會都掛;緩衝區比較長,若是hdfs出現問題,兩層的channel,多個flune的並聯能夠保證數據的安全且增大緩衝區)

 

④ Flume agent聚合  平常web應用一般分佈在上百個服務器,大者甚至上千個、上萬個服務器。產生的日誌,處理起來也很是麻煩。用flume的這種組合方式能很好的解決這一問題,每臺服務器部署一個flume採集日誌,傳送到一個集中收集日誌的flume,再由此flume上傳到hdfs、hive、hbase、jms等,進行日誌分析。

 

 

安裝

將apache-flume-1.7.0-bin.tar.gz上傳到linux的/opt/software目錄下
解壓apache-flume-1.7.0-bin.tar.gz到/opt/module/目錄下
[kris@hadoop101 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
[kris@hadoop101 module]$ mv apache-flume-1.7.0-bin/ flume
[kris@hadoop101 conf]$ mv flume-env.sh.template flume-env.sh
[kris@hadoop101 conf]$ vim flume-env.sh 
export JAVA_HOME=/opt/module/jdk1.8.0_144

 Flume異常處理

1)問題描述:若是啓動消費Flume拋出以下異常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解決方案步驟:

(1)在hadoop101服務器的/opt/module/flume/conf/flume-env.sh文件中增長以下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

同步配置到hadoop10二、hadoop103服務器

[kris@hadoop101 conf]$ xsync flume-env.sh

1. 監控端口數據--netcat

監控端口數據:
端口(netcat)--->flume--->Sink(logger)到控制檯

 

[kris@hadoop101 flume]$ mkdir job
[kris@hadoop101 flume]$ cd job/
[kris@hadoop101 job]$ touch flume-netcat-logger.conf
[kris@hadoop101 job]$ vim flume-netcat-logger.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat #a1的輸入源類型爲netcat端口類型
a1.sources.r1.bind = localhost #表示a1的主機
a1.sources.r1.port = 44444 #表示a1的監聽端口

# Describe the sink
a1.sinks.k1.type = logger #表示a1的輸出目的地是控制檯logger類型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory #表示a1的channel類型是memory內存類型
a1.channels.c1.capacity = 1000 #表示a1的channel總容量是1000個event
a1.channels.c1.transactionCapacity = 100 #表示a1的channel傳輸時收集到了100條event之後再去提交到事務

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 #表示r1和c1鏈接起來
a1.sinks.k1.channel = c1 #表示k1和c1鏈接起來
View Code

 

安裝nc工具
[kris@hadoop101 software]$ sudo yum install -y nc
判斷44444端口是否被佔用
[kris@hadoop101 flume]$ sudo netstat -tunlp | grep 44444
先開啓flume監聽端口
[kris@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
    --conf conf/表配置文件在conf/目錄; --name a1是給agent起名a1; --conf-file job/...表本次讀取配置文件是在job文件夾下的flume-netcat-logger.conf文件;
    -D表flume運行時動態修改flume.root.logger參數屬性值,並將控制檯打印級別設置爲INFO級別
[kris@hadoop101
~]$ cd /opt/module/flume/ 向本機的44444端口發送內容 [kris@hadoop101 flume]$ nc localhost 44444 hello OK kris OK 在Flume監聽頁面觀察接收數據狀況 2019-02-20 10:01:41,151 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F hello } 2019-02-20 10:01:45,153 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 6B 72 69 73 kris } netstat -nltp [kris@hadoop101 ~]$ netstat -nltp #出現了監聽這個端口號說明成功; tcp 0 0 ::ffff:127.0.0.1:44444 :::* LISTEN 4841/java

nc hadoop102 44444, flume不能接收到

netstat命令是一個監控TCP/IP網絡的很是有用的工具,它能夠顯示路由表、實際的網絡鏈接以及每個網絡接口設備的狀態信息。

-t或--tcp:顯示TCP傳輸協議的連線情況;

-u或--udp:顯示UDP傳輸協議的連線情況;

       -n或--numeric:直接使用ip地址,而不經過域名服務器;

       -l或--listening:顯示監控中的服務器的Socket;

       -p或--programs:顯示正在使用Socket的程序識別碼(PID)和程序名稱;

2. 實時讀取本地文件到HDFS

實時讀取本地文件到HDFS:
hive.log(exec)--->flume--->Sink(HDFS)

取Linux系統中的文件,就得按照Linux命令的規則執行命令。因爲Hive日誌在Linux系統中因此讀取文件的類型選擇:exec即execute執行的意思。表示執行Linux命令來讀取文件。

1.Flume要想將數據輸出到HDFS,必須持有Hadoop相關jar包

將commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷貝到/opt/module/flume/lib文件夾下

2.建立flume-file-hdfs.conf文件

[kris@hadoop101 job]$ vim flume-file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設置每一個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
View Code

tail -F /opt/module/hive/logs/hive.log    -F實時監控

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/flume-file-hdfs.conf 


開啓Hadoop和Hive並操做Hive產生日誌 sbin/start-dfs.sh sbin/start-yarn.sh bin/hive

在HDFS上查看文件。

 3. 實時讀取目錄文件到HDFS

 

 

實時讀取目錄文件到HDFS:
目錄dir(spooldir)--->flume--->Sink(HDFS)

[kris@hadoop101 job]$ vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir                      #定義source類型爲目錄
a3.sources.r3.spoolDir = /opt/module/flume/upload #定義監控日誌
a3.sources.r3.fileSuffix = .COMPLETED             #定義文件上傳完,後綴
a3.sources.r3.fileHeader = true                   #是否有文件頭
#忽略全部以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop101:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#從新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每一個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
View Code

 

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/flume-dir-hdfs.conf     
[kris@hadoop101 flume]$ mkdir upload
[kris@hadoop101 flume]$ cd upload/
[kris@hadoop101 upload]$ touch kris.txt
[kris@hadoop101 upload]$ touch kris.tmp
[kris@hadoop101 upload]$ touch kris.log
[kris@hadoop101 upload]$ ll  ##建立文件,hdfs上就會生成/flume/upload/20190224/11/upload-155...的文件;不添加內容就是空的;vim kri.log.COMPLETED寫入東西hdfs上仍是空的,它只是監控文件夾的建立;
總用量 0
-rw-rw-r--. 1 kris kris 0 2月  20 11:09 kris.log.COMPLETED
-rw-rw-r--. 1 kris kris 0 2月  20 11:08 kris.tmp
-rw-rw-r--. 1 kris kris 0 2月  20 11:08 kris.txt.COMPLETED
[kris@hadoop101 flume]$ cp README.md upload/
[kris@hadoop101 flume]$ cp LICENSE upload/
[kris@hadoop101 upload]$ ll
總用量 32
-rw-rw-r--. 1 kris kris     0 2月  20 11:09 kris.log.COMPLETED
-rw-rw-r--. 1 kris kris     0 2月  20 11:08 kris.tmp
-rw-rw-r--. 1 kris kris     0 2月  20 11:08 kris.txt.COMPLETED
-rw-r--r--. 1 kris kris 27625 2月  20 11:14 LICENSE.COMPLETED
-rw-r--r--. 1 kris kris  2520 2月  20 11:13 README.md.COMPLETED
在upload中建立一個文件,就會在hdfs上建立一個文件;
也可在文件裏邊追加數據

 

4. 單數據源多出口(選擇器)

單Source多Channel、Sink

單數據源多出口(選擇器):單Source多Channel、Sink
hive.log(exec)---->flume1--Sink1(avro)-->flume2--->Sink(HDFS)
           ---Sink2(avro)-->flume3--->Sink(file roll本地目錄文件data)

準備工做

       在/opt/module/flume/job目錄下建立group1文件夾

[kris@hadoop101 job]$ cd group1//opt/module/datas/目錄下建立flume3文件夾

[kris@hadoop101 datas]$ mkdir flume3

1.建立flume-file-flume.conf

配置1個接收日誌文件的source和兩個channel、兩個sink,分別輸送給flume-flume-hdfs和flume-flume-dir。

[kris@hadoop101 group1]$ vim flume-file-flume.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流複製給全部channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
View Code

Avro是由Hadoop創始人Doug Cutting建立的一種語言無關的數據序列化和RPC框架。

注:RPC(Remote Procedure Call)—遠程過程調用,它是一種經過網絡從遠程計算機程序上請求服務,而不須要了解底層網絡技術的協議。

[kris@hadoop101 group1]$ vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個數據接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
View Code

 

[kris@hadoop101 group1]$ vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop101
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
View Code

 

執行配置文件
分別開啓對應配置文件:flume-flume-dir,flume-flume-hdfs,flume-file-flume。//從sink端往source端開啓

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf 
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf 
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf 

啓動Hadoop和Hive
start-dfs.sh
start-yarn.sh
bin/hive

檢查HDFS上數據

檢查/opt/module/datas/flume3目錄中數據

[kris@hadoop101 ~]$ cd /opt/module/datas/flume3/
[kris@hadoop101 flume3]$ ll
總用量 4
-rw-rw-r--. 1 kris kris    0 2月  20 11:49 1550634573721-1
-rw-rw-r--. 1 kris kris    0 2月  20 11:54 1550634573721-10
-rw-rw-r--. 1 kris kris    0 2月  20 11:54 1550634573721-11
-rw-rw-r--. 1 kris kris    0 2月  20 11:50 1550634573721-2
-rw-rw-r--. 1 kris kris    0 2月  20 11:50 1550634573721-3
-rw-rw-r--. 1 kris kris    0 2月  20 11:51 1550634573721-4
-rw-rw-r--. 1 kris kris    0 2月  20 11:51 1550634573721-5
-rw-rw-r--. 1 kris kris    0 2月  20 11:52 1550634573721-6
-rw-rw-r--. 1 kris kris    0 2月  20 11:52 1550634573721-7
-rw-rw-r--. 1 kris kris    0 2月  20 11:53 1550634573721-8
-rw-rw-r--. 1 kris kris 1738 2月  20 11:53 1550634573721-9
[kris@hadoop101 flume3]$ cat 1550634573721-9
2019-02-20 11:00:42,459 INFO  [main]: metastore.hivemetastoressimpl (HiveMetaStoreFsImpl.java:deleteDir(53)) - Deleted the diretory hdfs://hadoop101:9000/user/hive/warehouse/student22
2019-02-20 11:00:42,460 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogEnd(148)) - </PERFLOG method=runTasks start=1550631641861 end=1550631642460 duration=599 from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,461 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogEnd(148)) - </PERFLOG method=Driver.execute start=1550631641860 end=1550631642461 duration=601 from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,461 INFO  [main]: ql.Driver (SessionState.java:printInfo(951)) - OK
2019-02-20 11:00:42,461 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogBegin(121)) - <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,461 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogEnd(148)) - </PERFLOG method=releaseLocks start=1550631642461 end=1550631642461 duration=0 from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,461 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogEnd(148)) - </PERFLOG method=Driver.run start=1550631641638 end=1550631642461 duration=823 from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,461 INFO  [main]: CliDriver (SessionState.java:printInfo(951)) - Time taken: 0.824 seconds
2019-02-20 11:00:42,461 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogBegin(121)) - <PERFLOG method=releaseLocks from=org.apache.hadoop.hive.ql.Driver>
2019-02-20 11:00:42,462 INFO  [main]: log.PerfLogger (PerfLogger.java:PerfLogEnd(148)) - </PERFLOG method=releaseLocks start=1550631642461 end=1550631642462 duration=1 from=org.apache.hadoop.hive.ql.Driver>

5. 單數據源多出口案例(Sink組)

單Source、Channel多Sink(負載均衡)  

 

Flume 的負載均衡和故障轉移

 

目的是爲了提升整個系統的容錯能力和穩定性。簡單配置就能夠輕鬆實現,首先須要設置 Sink 組,同一個 Sink 組內有多個子 Sink,不一樣 Sink 之間能夠配置成負載均衡或者故障轉移。

 

單數據源多出口(Sink組): flum1-load_balance
端口(netcat)--->flume1---Sink1(avro)-->flume2---Sink(Logger控制檯)
          ---Sink2(avro)-->flume3---Sink(Logger控制檯)

flume1配置了數據均衡的輸出到各個sink端:見下

[kris@hadoop101 group2]$ cat flume-netcat-flume.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop101
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
View Code
[kris@hadoop101 group2]$ cat flume-flume-console1.conf  
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
View Code
[kris@hadoop101 group2]$ cat flume-flume-console2.conf 
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop101
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
View Code

 

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a3 -f  job/group2/flume-flume-console2.conf  -Dflume.root.logger=INFO,console
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger.INFO,console
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf 

 

[kris@hadoop101 group2]$ nc localhost 44444
1
OK
1
OK
2
OK
3
OK
4

oggerSink.java:95)] Event: { headers:{} body: 31                                              1 }
2019-02-20 15:26:37,828 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31                                              1 }
2019-02-20 15:26:37,828 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32                                              2 }
2019-02-20 15:26:37,829 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 33                                              3 }
2019-02-20 15:26:37,829 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 34                                              4 }
2019-02-20 15:26:37,830 (SinkRunne

2019-02-20 15:27:06,706 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 61                                              a }
2019-02-20 15:27:06,706 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 62                                              b }
2019-02-20 15:27:06,707 

6. 多數據源彙總

多Source彙總數據到單Flume

7. 多數據源彙總:

group.log(exec)--->flume1--Sink(avro;hadoop103--4141)-->flume3---Sink(Logger控制檯)
  端口(netcat)-->flume2--Sink(avro;hadoop103-4141)-->flume3---Sink(Logger控制檯)

分發Flume

[kris@hadoop101 module]$ xsync flume
在hadoop10一、hadoop102以及hadoop103的/opt/module/flume/job目錄下建立一個group3文件夾。
[kris@hadoop101 job]$ mkdir group3
[kris@hadoop102 job]$ mkdir group3
[kris@hadoop103 job]$ mkdir group3

1.建立flume1-logger-flume.conf

配置Source用於監控hive.log文件,配置Sink輸出數據到下一級Flume。

在hadoop102上建立配置文件並打開

[kris@hadoop102 group3]$ vim flume1-logger-flume.conf
# Name the components on this agent
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop103
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
View Code

2.建立flume2-netcat-flume.conf

配置Source監控端口44444數據流,配置Sink數據到下一級Flume:

在hadoop101上建立配置文件並打開

[kris@hadoop101 group3]$ vim flume2-netcat-flume.conf
# Name the components on this agent
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
View Code

3.建立flume3-flume-logger.conf

配置source用於接收flume1與flume2發送過來的數據流,最終合併後sink到控制檯。

在hadoop103上建立配置文件並打開;由於前面兩個avro都是hadoop103: 4141,它們的ip和端口是同樣的,因此只需配置一個avro便可

[kris@hadoop103 group3]$ vim flume3-flume-logger.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
View Code

4.執行配置文件

分別開啓對應配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf。

[kris@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf 
[kris@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf 

 

在hadoop102上向/opt/module目錄下的group.log追加內容
[kris@hadoop102 module]$ echo "Hello World" > group.log
[kris@hadoop102 module]$ ll
總用量 24
drwxrwxr-x. 10 kris kris 4096 2月  20 11:07 flume
-rw-rw-r--.  1 kris kris   12 2月  20 16:13 group.log
在hadoop101上向44444端口發送數據
[kris@hadoop101 flume]$ nc hadoop101 44444
1
OK
2
OK
3
OK
4

檢查hadoop103上數據
2019-02-20 16:13:20,748 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64                Hello World }
2019-02-20 16:14:46,774 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31                                              1 }
2019-02-20 16:14:46,775 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32                                              2 }

8. 練習

案例需求:

1)flume-1監控hive.log日誌,flume-1的數據傳送給flume-2,flume-2將數據追加到本地文件,同時將數據傳輸到flume-3。

2)flume-4監控本地另外一個本身建立的文件any.txt,並將數據傳送給flume-3。

3)flume-3將彙總數據寫入到HDFS。

請先畫出結構圖,再開始編寫任務腳本。

hive.log(exec)--->flume-1 ---Sink1(avro;hadoop101:4141) --> flume-2--Sink1(logger本地文件)  
                                                                   --Sink2(avro;hadoop101:4142) --> flume-3--Sink(HDFS)
                                        本地any.txt(exec)--->flume-4--Sink(avro;hadoop101:4142)-->flume-3到HDFS
啓動三、二、一、4

flume-1:
vim flume1-file-flume.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop101 
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
View Code

flume2:
vim flume2-flume-dir.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1 k2
a2.channels = c1 c2
# 將數據流複製給全部channel
a2.sources.r1.selector.type = replicating

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop101
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = hadoop101
a2.sinks.k2.port = 4142

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Describe the channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels =c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
View Code

flume3:
vim flume3-flume-hdfs.conf

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
# source端的avro是一個數據接收服務
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop101
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume3/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時間滾動文件夾
a3.sinks.k1.hdfs.round = true
#多少時間單位建立一個新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#從新定義時間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
View Code

flume4:
vim flume4-file-flume.conf

# Name the components on this agent
a4.sources = r1
a4.sinks = k1
a4.channels = c1

# Describe/configure the source
a4.sources.r1.type = exec
a4.sources.r1.command = tail -F /opt/module/datas/any.txt
a4.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a4.sinks.k1.type = avro
a4.sinks.k1.hostname = hadoop101
a4.sinks.k1.port = 4142

# Describe the channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
View Code

啓動

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume3-flume-hdfs.conf 
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume2-flume-dir.conf -Dflume.root.logger=INFO,console
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group4/flume1-file-flume.conf 
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a4 -f job/group4/flume4-file-flume.conf 
數據來源:flume1;hive.log和flume4| any.txt文件

[kris@hadoop101 datas]$ cat any.txt  ##文件發生變化hdfs上會實時更新
1
2
3
4
5
《疑犯追蹤》    懸疑,動做,科幻,劇情
《Lie to me》   懸疑,警匪,動做,心理,劇情
《戰狼2》       戰爭,動做,災難
II
Love
You
[kris@hadoop101 datas]$ pwd
/opt/module/datas

9. 自定義Source

Source是負責接收數據到Flume Agent的組件。Source組件能夠處理各類類型、各類格式的日誌數據,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的source類型已經不少,可是有時候並不能知足實際開發當中的需求,此時咱們就須要根據實際需求自定義某些source。

官方也提供了自定義source的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#source根據官方說明自定義MySource須要繼承AbstractSource類並實現ConfigurablePollableSource接口。

實現相應方法:

getBackOffSleepIncrement()//暫不用

getMaxBackOffSleepInterval()//暫不用

configure(Context context)//初始化context(讀取配置文件內容)

process()//獲取數據封裝成event並寫入channel,這個方法將被循環調用。

使用場景:讀取MySQL數據或者其餘文件系統。

需求:使用flume接收數據,並給每條數據添加前綴,輸出到控制檯。前綴可從flume配置文件中配置。

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;
import java.util.Map;

public class MySource extends AbstractSource implements Configurable, PollableSource {
    private String prefix;
    private Long delay;
    /**
     * 數據處理方法,被flume循環調用
     * @return 數據讀取狀態
     * @throws EventDeliveryException 咱們異常就回滾
     */
    public Status process() throws EventDeliveryException {
        Status status = null;  //Status是個enum類型,成功或失敗
        //建立事件
        SimpleEvent event = new SimpleEvent(); //Event由可選的header和載有數據的一個byte array 構成
        Map<String, String> headerMap = new HashMap<String, String>(); //Header是容納了key-value字符串對的HashMap。
        for (int i = 0; i < 5; i++){
            try {
                event.setHeaders(headerMap); //封裝事件
                event.setBody((prefix + "LLL" + i).getBytes());
                getChannelProcessor().processEvent(event);//將事件寫入channel
                status = Status.READY;
                Thread.sleep(delay);
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }

        }
        return status;
    }
    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 配置自定義的Source
     * @param context
     */
    public void configure(Context context) {
        prefix = context.getString("prefix", "Hello");
        delay = context.getLong("delay", 1000L);

    }
}

測試

1)打包

將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume)下。

2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.atguigu.source.MySource
a1.sources.r1.delay = 1000
#a1.sources.r1.field = HelloWorld

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
View Code
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysource-flume-logger.conf -Dflume.root.logger=INFO,console

 

10. 自定義Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent。

Sink是徹底事務性的。在從Channel批量刪除數據以前,每一個Sink用Channel啓動一個事務。批量事件一旦成功寫出到存儲系統或下一個Flume Agent,Sink就利用Channel提交事務。事務一旦被提交,該Channel從本身的內部緩衝區刪除事件。

Sink組件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定義。官方提供的Sink類型已經不少,可是有時候並不能知足實際開發當中的需求,此時咱們就須要根據實際需求自定義某些Sink。

官方也提供了自定義source的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#sink根據官方說明自定義MySink須要繼承AbstractSink類並實現Configurable接口。

實現相應方法:

configure(Context context)//初始化context(讀取配置文件內容)

process()//從Channel讀取獲取數據(event),這個方法將被循環調用。

使用場景:讀取Channel數據寫入MySQL或者其餘文件系統。

需求:使用flume接收數據,並在Sink端給每條數據添加前綴和後綴,輸出到控制檯。先後綴可在flume任務配置文件中配置。

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSink.class); //建立logger對象
    private String prefix;
    private String suffix;

    /**
     * Sink從channel中拉取數據並處理
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {

        Status status = null; //聲明返回值狀態
        Event event;//聲明事件
        Channel channel = getChannel();//獲取當前sink綁定的channel
        Transaction transaction = channel.getTransaction();//獲取事務

        transaction.begin(); //開啓事務
        try {
            while ((event = channel.take()) == null) {
                Thread.sleep(500);
            }
                LOGGER.info(prefix + new String(event.getBody()) + suffix);
                status = Status.READY;
                transaction.commit(); //事務提交

        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;

            transaction.rollback(); //事務回滾
        } finally {
            transaction.close(); //關閉事務
        }
            return status;
    }

    /**
     * 設置Sink
     * @param context 上下文環境
     */
    public void configure(Context context) {
        prefix = context.getString("prefix", "Hello");
        suffix = context.getString("suffix", "kris");
    }
}

測試

1)打包

將寫好的代碼打包,並放到flume的lib目錄(/opt/module/flume)下。

2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.atguigu.source.MySink
#a1.sinks.k1.prefix = kris:
a1.sinks.k1.suffix = :kris

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
View Code

 

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/mysource-flume-netcat.conf -Dflume.root.logger=INFO,console 

[kris@hadoop101 job]$ nc localhost 44444
1
OK
2
2019-02-24 16:27:25,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - com.atguigu.source.MySink.process(MySink.java:32)] kris:1:kris
2019-02-24 16:27:25,777 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - com.atguigu.source.MySink.process(MySink.java:32)] kris:2:kris

11. Flume監控之Ganglia

Ganglia的安裝與部署

安裝ganglia 、httpd服務與php、其餘依賴

sudo rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
sudo yum -y install httpd php rrdtool perl-rrdtool rrdtool-devel apr-devel ganglia-gmetad ganglia-web ganglia-gmond

Ganglia由gmond、gmetad和gweb三部分組成。

gmond(Ganglia Monitoring Daemon)是一種輕量級服務,安裝在每臺須要收集指標數據的節點主機上。使用gmond,你能夠很容易收集不少系統指標數據,如CPU、內存、磁盤、網絡和活躍進程的數據等。

gmetad(Ganglia Meta Daemon)整合全部信息,並將其以RRD格式存儲至磁盤的服務。

gweb(Ganglia Web)Ganglia可視化工具,gweb是一種利用瀏覽器顯示gmetad所存儲數據的PHP前端。在Web界面中以圖表方式展示集羣的運行狀態下收集的多種不一樣指標數據。

配置

1)修改配置文件/etc/httpd/conf.d/ganglia.conf   [kris@hadoop101 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf 2)修改成紅顏色的配置: # Ganglia monitoring system php web frontend Alias /ganglia /usr/share/ganglia <Location /ganglia> Order deny,allow #Deny from all Allow from all # Allow from 127.0.0.1 # Allow from ::1 # Allow from .example.com </Location> 3) 修改配置文件/etc/ganglia/gmetad.conf   [kris@hadoop101 flume]$ sudo vim /etc/ganglia/gmetad.conf 修改成:   data_source "hadoop101" 192.168.1.101 3) 修改配置文件/etc/ganglia/gmond.conf   [kris@hadoop101 flume]$ sudo vim /etc/ganglia/gmond.conf   修改成: cluster { name = "hadoop101" owner = "unspecified" latlong = "unspecified" url = "unspecified" } udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without  # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs.  # mcast_join = 239.2.11.71 host = 192.168.1.101 port = 8649 ttl = 1 } udp_recv_channel {  # mcast_join = 239.2.11.71 port = 8649 bind = 192.168.1.101 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer = 10485760 } 4) 修改配置文件/etc/selinux/config   [kris@hadoop101 flume]$ sudo vim /etc/selinux/config   修改成: # This file controls the state of SELinux on the system. # SELINUX= can take one of these three values: # enforcing - SELinux security policy is enforced. # permissive - SELinux prints warnings instead of enforcing. # disabled - No SELinux policy is loaded. SELINUX=disabled # SELINUXTYPE= can take one of these two values: # targeted - Targeted processes are protected, # mls - Multi Level Security protection. SELINUXTYPE=targeted
尖叫提示:selinux本次生效關閉必須重啓,若是此時不想重啓,能夠臨時生效之: [kris@hadoop101 flume]$
sudo setenforce 0

 

1.啓動
1
) 啓動ganglia [kris@hadoop101 flume]$ sudo service httpd start [kris@hadoop101 flume]$ sudo service gmetad start [kris@hadoop101 flume]$ sudo service gmond start 2) 打開網頁瀏覽ganglia頁面 http://192.168.1.101/ganglia 尖叫提示:若是完成以上操做依然出現權限不足錯誤,請修改/var/lib/ganglia目錄的權限: [kris@hadoop101 flume]$ sudo chmod -R 777 /var/lib/ganglia
2 操做Flume測試監控   1) 修改/opt/module/flume/conf目錄下的flume-env.sh配置: JAVA_OPTS="-Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.1.101:8649 -Xms100m -Xmx200m"   2) 啓動Flume任務 [kris@hadoop101 flume]$ bin/flume-ng agent \ --conf conf/ \ --name a1 \ --conf-file job/flume-netcat-logger.conf \ -Dflume.root.logger==INFO,console \ -Dflume.monitoring.type=ganglia \ -Dflume.monitoring.hosts=192.168.1.101:8649
簡寫以下:
[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=192.168.1.101:8649
3) 發送數據觀察ganglia監測圖 [kris@hadoop101 flume]$ nc localhost 44444

 

flume.CHANNEL.c1.EventPutSuccessCount   flume發送的單例叫event,put叫成功接收的數據,就是往channel裏邊put的數據

flume.CHANNEL.c1.EventTakeSuccessCount  這個是take的數據,更日誌數據作對比看有沒有丟數據

 flume.CHANNEL.c1.ChannelFillPercentage 這個數只要不滿,就不會丟數據,若是1.0表示所有填滿了;

相關文章
相關標籤/搜索