Flume淺度學習指南

flume簡介

cloudera 公司開源的,貢獻給Apache基金會 html

http://flume.apache.org/ java

http://archive.cloudera.com/c...node

只能運行在linux系統上mysql

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.linux

flume用來高效的收集、聚合、移動大量的日誌數據 nginx

有一個基於流式的簡單的有彈性的傳輸模型 web

有一個健壯的可容錯的機制 正則表達式

使用簡單,能夠擴展的數據模型運行使用到在線實時分析應用中 redis

簡單體如今flume-agent的配置及傳輸模型簡單 算法

在線實時分析應用中

flume日誌的實時採集->sparkStreaming/storm/Flink =>mysql/redis=>實時分析的結果進行報表展現

數據(日誌)的移動傳輸工具:

日誌=>系統運行日誌、web服務器的訪問日誌、客戶端的用戶行爲日誌、軟件的運行操做日誌

能夠將數據從數據源中採集並移動到另一個目的地:

數據源=>系統本地日誌文件中的數據、jms、avro端口、kafka、系統本地目錄下... 
目的地=>hdfs、hive、hbase、kafka、系統本地一個文件中...

如何將linux本地的一個日誌文件中的日誌數據採集到hdfs上

  • 腳本+hdfs命令 =>【週期性】上傳

    #!/bin/sh
    HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2    
    $HADOOP_HOME/bin/hdfs -put /.../xx.log  /hdfs

    針對項目初期數據量較少時可使用 , 沒有容災性及穩定性

  • 採用flume日誌採集框架=>【實時】採集一個日誌文件中實時追加的日誌數據並寫入到目的地

    針對不一樣的應用場景定義並啓動對應的flume-agent實例/進程 
        source  -- 定義從哪裏採集數據  
            exec類型的source能夠藉助Linux的shell命令實現實時讀取一個日誌文件中動態追加的日誌數據 
            avro類型 
            ……
        channel  -- 定義了source採集的數據臨時存儲地   
            memory 基於內存的管道容器 
            file 基於磁盤 
        sink  -- 定義將數據最終寫入的-目的地  
            hdfs類型的sink將數據最終寫入到hdfs上  
            hive類型將數據最終寫入到hive表 
            kafka類型將數據最終寫入到kafka分佈式消息隊列中  
            ……

flume-agent實例的模型

每一個flume-agent實例至少由如下三個功能模塊組成 
    source模塊  
        用於監控數據源並進行數據的實時採集,是實時產生數據流的模塊
        數據源=>系統本地的一個日誌文件中、kafka、jms、系統本地的一個目錄下、avro端口  。。。 
        source將採集到的數據提交到channel中
    channel模塊  
        用於鏈接source和sink的管道容器  
        相似一個隊列(FIFO)
    sink模塊  
        從channel中拉取take(剪切)數據並最終將數據寫入到目的地
        目的地=>hdfs、hive、hbase、kafka、avro端口...  
            
event事件: 
    event事件是flume傳輸日誌數據時基本單元,在flume-agent內部數據都是以事件形式存在 
        source將採集到的數據封裝成一個個的event事件,將事件提交到channel
        sink從channel消費事件並將事件中封裝的數據最終寫入到目的地  
    event事件的數據結構:header + body      
        header 
            是一個map集合類型 
            內部的key-value爲該事件的元數據信息,主要用來區分不一樣的事件 
        body 
            是一個字節數組類型
            body爲咱們真正要傳輸的數據

flume的安裝使用

flume-ng-1.6.0-cdh5.14.2

安裝 
    一、上次解壓flume的安裝包 
        $ tar zxvf  /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/
        $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2  修改目錄名稱-可選  
    二、修改flume配置文件 
        $ mv conf/flume-env.sh.template conf/flume-env.sh  修改後環境配置文件才能生效
        $ vi conf/flume-env.sh 
            export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112
    三、針對不一樣的場景需求配置對應的java屬性配置文件並啓動flume-agent進程  
        
        如何啓動一個flume-agent進程  
        $ bin/flume-ng agent  \
        --name或-n 當前flume-agent實例的別名  \
        --conf或-c 當前flume框架的配置文件目錄 \
        --conf-file,-f 與當前要啓動的flume-agent進程相匹配的java屬性配置文件的本地路徑  
            
        Usage: bin/flume-ng <command> [options]...

案例

flume官方簡單案例

定義一個flume-agent去監聽讀取某臺服務器上的某個端口中的數據,並將監聽讀取到的數據最終寫入到flume框架本身的日誌文件中

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = centos01
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
提交測試: 
    $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties &  
肯定目標服務器的端口是否已經成功被flume-agent代理進程監聽  
    $ netstat -antp |grep 44444     --查看端口信息 
    $ ps -ef | grep flume  -- 查看進程信息  
    
安裝一個telnet工具並鏈接服務器端口寫入數據  
    $ sudo yum -y install telnet
    
    發送消息數據  
    
檢查flume的日誌文件中的數據 
    $ tail -f logs/flume.log

sources = exec

要求使用flume實時監控讀取系統本地一個日誌文件中動態追加的日誌數據並實時寫入到hdfs上的某個目錄下

# example.conf: A single-node Flume configuration
#同一臺Linux上可開啓多個flume-agent,但agent別名要區分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令讀取本地文件,Linux的命令不中止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即便沒有這個-F後面指定的文件,命令也不會中止,容錯能力強
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log




# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




#聲明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog
a2.sinks.k2.hdfs.filePrefix = nginxData





# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

報錯首先找logs目錄

報錯:找不到類

缺乏jar包

[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/     
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar  /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/

sinks = hdfs 優化

解決生成的文件過多太小的問題(但願文件的大小=128M)

將日誌文件按照日期分目錄存儲(按照天分目錄存儲)

將生成的日誌文件的格式改成Text文本格式

修改上個例子的flume-agent屬性文件

# 聲明當前flume-agent的別名及當前的flume-agent實例包含的模塊的別名和個數
a2.sources = s2
a2.channels = c2
a2.sinks = k2

# 定義source模塊中的s2的類型及與此類型相關的延伸屬性 
# exec類型的source能夠藉助執行一條linux shell命令實現讀取linux系統上某個文件中的日誌數據,其中 cat是一次性讀取,tail能夠實現實時讀取新增長的數據  
# shell屬性用來聲明要執行的命令的運行環境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /opt/nginx/access.log 
a2.sources.s2.shell = /bin/sh -c


# 定義channel模塊中的c2的類型及與此類型相關的延伸屬性  
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100


# 定義sink模塊中的k2的類型及與此類型相關的延伸屬性 
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d
#啓用根據時間生成路徑中的轉義字符的具體的時間值
a2.sinks.k2.hdfs.round = true
#表示使用本地linux系統時間戳做爲時間基準,不然會自動參考事件的header中的時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#設置文件的前綴
a2.sinks.k2.hdfs.filePrefix = NgnixLog


#設置解決文件過多太小問題
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1


#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#從性能上考慮,最優的是batchSize=transactionCapacity 
a2.sinks.k2.hdfs.batchSize = 100

# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a2.sinks.k2.hdfs.writeFormat = Text 


# 將a2中的source及sink模塊綁定到對應的channel模塊上 
# 一個source模塊能夠同時綁定多個channel模塊,可是一個sink模塊只能綁定一個惟一的channel
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2

sources = spooldir

利用flume監控某個目錄下的日誌文件,當某個目錄下出現符合要求的文件名稱的文件時,則對文件中的日誌數據進行讀取,並將數據最終寫入到hdfs上

目錄
    /opt/data/logs
        nginx-access.log.2018120309 
        nginx-access.log.2018120310
# example.conf: A single-node Flume configuration
#同一臺Linux上可開啓多個flume-agent,但agent別名要區分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
# includePattern 用正則表達式指定要包含的文件
# ignorePattern  用正則表達式指定要忽略的文件
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /home/chen/mylogs
# 因爲每次讀完會給讀完的文件增長.COMPLETED從而造成新文件,須要忽略這些文件
a2.sources.r2.ignorePattern = ^.*\.COMPLETED$
# includePattern和ignorePattern會同時生效
a2.sources.r2.includePattern =     ^.*$




# Use a channel
# file類型更安全
# memory類型效率更高
a2.channels.c2.type = file
a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data




#聲明a2的sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d
#啓用根據時間生成轉義字符的具體的時間值
a2.sinks.k2.hdfs.round = true
#使用本地linux系統時間戳做爲時間基準,不然會自動參考事件的header中的時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true

a2.sinks.k2.hdfs.filePrefix = nginxData


#設置解決文件過多太小問題
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#從性能上考慮,最優的是batchSize=transactionCapacity 
a2.sinks.k2.hdfs.batchSize = 100


# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a2.sinks.k2.hdfs.writeFormat = Text


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

一臺彙總服務器

需求:            
Nginx服務器集羣 -- 10臺  
    每臺服務器上都有一個access.log日誌文件  
    須要將每臺服務器上的日誌文件中追加的日誌數據實時讀取並寫入到hdfs上 
    
    
    思路1: 
        每臺Nginx服務器上啓動一個flume-agent 
            source - exec  
            channel - mem 
            sink - hdfs  
        多個flume-agent同時寫入數據到hfds上不利於hdfs的穩定性 

    思路2: 
        每臺Nginx服務器上啓動一個flume-agent 
            source - exec  
            channel - mem 
            sink - avro   
                type = avro 
                hostname = 主機名
                port =  端口號 
            將數據統一寫入到某臺服務器某個端口中 
            
        啓動一個負責對彙總後的數據統一寫入到目的地的flum-agent 
            source - avro   
                type = avro
                bind = 
                port = 
            channel - mem 
            sink - hdfs

nginxs2flume.properties

# example.conf: A single-node Flume configuration
#同一臺Linux上可開啓多個flume-agent,但agent別名要區分
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
#依靠的是Linux的命令讀取本地文件,Linux的命令不中止flume就不停
a2.sources.r2.type = exec
# tail -F 文件名  即便沒有這個-F後面指定的文件,命令也不會中止,容錯能力強
a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log




# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




#聲明a2的sink
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = centos01
a2.sinks.k2.port = 6666


# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

flume2hdfs.properties

# example.conf: A single-node Flume configuration
#同一臺Linux上可開啓多個flume-agent,但agent別名要區分
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = avro
a3.sources.r3.bind = centos01
a3.sources.r3.port = 6666




# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100




#聲明a3的sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test
a3.sinks.k3.hdfs.writeFormat = Text 

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

Flume Channel Selectors

做用:

當一個flume-agent中存在多個channel時,爲Source選擇下游的Channel進行配置

類型一:Replicating Channel Selector (default) 複製模式

source將事件複製並提交給全部與其綁定的channel中
    默認屬性能夠省略定義
flume-agent
        source
            s1 -- 但願採集用戶日誌複製份數爲n(n=channel的個數)分配到各個channel中
        channel
            c1
            c2
        sink
            k1 -- hdfs
            k2 -- kafka的某個主題中
# 聲明agent名稱及此agent要使用的source、channel、sink名稱 
a2.sources = s2
a2.channels = c1 c2 
a2.sinks = k1 k2

# 定義source類型及相關屬性,須要聲明下運行該命令的shell環境
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log
# 默認屬性就是replicating
a2.sources.s2.selector.type = replicating





# 定義c1
# 定義channel類型及相關屬性
a2.channels.c1.type = memory
# 管道的最大容量
a2.channels.c1.capacity = 1000
# 每次最大從source獲取的事件數量和sink每次獲取的event最大數量
a2.channels.c1.transactionCapacity = 100

# 定義c2
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100




# k1
# 定義kafka的sink
a2.sinks.k1.type = org.apache.flume.source.kafka.KafkaSource
a2.sinks.k1.kafka.bootstrap.servers = centos01:9092
a2.sinks.k1.kafka.topics = nginxTopic


# k2
# 定義sinks類型及相關屬性
a2.sinks.k2.type = hdfs
#設置按天進行生成存儲目錄,天天生成一個文件夾
a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/nginx/%Y%m%d/

#啓用根據時間生成轉義字符的值
a2.sinks.k2.hdfs.round = true
#使用本地時間戳做爲基準  
a2.sinks.k2.hdfs.useLocalTimeStamp = true

#設置文件的前綴
a2.sinks.k2.hdfs.filePrefix = flumeLog

#設置解決文件過多太小問題
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 128000000
a2.sinks.k2.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
a2.sinks.k2.hdfs.batchSize = 100
# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a2.sinks.k2.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a2.sinks.k2.hdfs.writeFormat = Text

# 綁定sink和source到channel
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
a2.sources.s2.channels = c1 c2

類型二:Multiplexing Channel Selector(網址案例)多路複用模式

source根據事件的header頭信息中的不一樣值提交給對應的channel中
事件中自帶頭信息
    source直接判斷便可
    https://blog.csdn.net/looklook5/article/details/40430965--官方案例
事件沒有頭信息
    須要自定義header頭信息,又稱爲定義攔截器
    http://www.cnblogs.com/Skyar/p/5831935.html    --案例                
        使用正則匹配選擇器類型對日誌分解並指定key名稱
        (\\w+)表示任意單詞字符串,範圍包括a-zA-Z0-9_ 
        file_roll類型本地目錄提早建立,默認目錄下每30s回滾一個文件
        使用echo "192.168.134.101:bigdata01:spark" >> test.log

mul_demo.properties

#聲明代理及三種模塊的別名及數量
a1.sources = s1
a1.channels = c1 c2 c3 c4 
a1.sinks = k1 k2 k3 k4


# 定義s1
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /opt/flume/test.log

# 聲明該source的channel selector類型 
# multiplexing類型的channel selector會根據header中的某個key的值的不一樣提交給不一樣的channel
a1.sources.s1.selector.type = multiplexing
# 聲明依據header頭的key爲course的值來判斷提交該條數據給那些channel 
# header map( k1->v1,k2->v2 ,k3->v3 )
a1.sources.s1.selector.header = course 
# 若是該條數據中的header中的course的值爲hadoop,則將該事件提交給c1 
a1.sources.s1.selector.mapping.hadoop = c1
a1.sources.s1.selector.mapping.hive = c1 c2
a1.sources.s1.selector.mapping.hbase = c3
# 若是以上都未匹配到則默認提交給c4 
a1.sources.s1.selector.default = c4

# 聲明一個source的攔截器組件,別名叫 i1
a1.sources.s1.interceptors = i1

# 聲明此攔截器的類型 ,regex_extractor爲正則匹配攔截器類型 
a1.sources.s1.interceptors.i1.type = regex_extractor

# (\\w+) 表示能夠匹配任意的字符串 範圍包括 a-zA-Z0-9_
#  abc:erdfd:gsaafdf
# 將一條日誌經過':'的分割並聲明對應的key值,最終定義出一個包含三個key-value對的header頭信息 
# 192.168.134.101:bigdata01:spark  =>
# header{ip->192.168.134.101,domain->bigdata01,course->spark} body (192.168.134.101:bigdata01:spark )
a1.sources.s1.interceptors.i1.regex = (\\w+):(\\w+):(\\w+)
a1.sources.s1.interceptors.i1.serializers = s1 s2 s3
a1.sources.s1.interceptors.i1.serializers.s1.name = ip
a1.sources.s1.interceptors.i1.serializers.s2.name = domain
a1.sources.s1.interceptors.i1.serializers.s3.name = course


# 定義 c1 
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 定義 c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# 定義 c3
a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100
# 定義 c4
a1.channels.c4.type = memory
a1.channels.c4.capacity = 1000
a1.channels.c4.transactionCapacity = 100


# 定義k1 
#file_roll類型爲將事件寫入到linux本地磁盤 
a1.sinks.k1.type = file_roll
#定義將事件寫入到本地的目錄
a1.sinks.k1.sink.directory = /opt/flume/k1
a1.sinks.k1.sink.rollInterval = 0

# 定義k2
a1.sinks.k2.type = file_roll
a1.sinks.k2.sink.directory = /opt/flume/k2
a1.sinks.k2.sink.rollInterval = 0
# 定義k3
a1.sinks.k3.type = file_roll
a1.sinks.k3.sink.directory = /opt/flume/k3
a1.sinks.k3.sink.rollInterval = 0
# 定義k4
a1.sinks.k4.type = file_roll
a1.sinks.k4.sink.directory = /opt/flume/k4
a1.sinks.k4.sink.rollInterval = 0


# Bind the source and sink to the channel
a1.sources.s1.channels = c1 c2 c3 c4  
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3
a1.sinks.k4.channel = c4



# 最終能夠實現的需求  

若是source採集的日誌爲 
    192.168.134.101:bigdata01:spark  =》日誌發送給 c4
    192.168.134.101:bigdata01:hive  =》日誌發送給 c1 c2 
    192.168.134.106:bigdata04:hbase  =》日誌發送給 c3
    192.168.134.106:bigdata04:hadoop  =》日誌發送給 c1 
    192.168.134.106:bigdata04:oozie  =》日誌發送給 c4

Flume Sink Processors

做用: 
    將一個flume-agent中的多個sink定義到一個Sinkgroups組中
    使組內多sink之間實現故障轉移或負載均衡
類型:

Default Sink Processor

一個組內只有一個sink,不強制用戶爲Sink建立Processor
        就是以前的單sink案例

Failover Sink Processor

經過配置維護了多個sink組成的優先級列表
        須要爲全部的sink分配優先級,全部的優先級數字必須是惟一的
        數字越大表示優先獲取channel數據 
        搭配 Replicating Channel Selector 使用

nginx2avro.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
a2.sources = s2
a2.channels = c1 c2
a2.sinks = k1 k2


## Failover Sink Processor
#聲明一個由k1 k2組成的組,組名稱爲g1
a2.sinkgroups = g1
a2.sinkgroups.g1.sinks = k1 k2 
#經過如下定義能夠聲明k1和k2的關係時故障轉移關係(ha)
a2.sinkgroups.g1.processor.type = failover
#配置各個sink的優先級,只要有大小差異就行
a2.sinkgroups.g1.processor.priority.k1 = 10
a2.sinkgroups.g1.processor.priority.k2 = 5
#聲明failover前最大等待時間,默認10s
a2.sinkgroups.g1.processor.maxpenalty = 10000


# 定義source模塊的類型及此類型相關的其餘屬性
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log  
a2.sources.s2.shell = /bin/sh -c

#定義c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#定義c2
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# 定義k1
# 注意:avro的歸屬服務器要在不一樣節點上,不然失去HA意義,端口號由於不在同一臺服務器上因此能夠相同
# 由於目前是僞分佈,因此服務器只能同樣而且端口號要區分開
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.134.101
a2.sinks.k1.port = 4545


# 定義k2
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = 192.168.134.101
a2.sinks.k2.port = 4546


# 將source和sink模塊綁定到對應的channel管道上 
a2.sources.s2.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2

avro2hdfs-active.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
a3.sources = s3
a3.sinks = k3
a3.channels = c3


# 定義source模塊的類型及此類型相關的其餘屬性
a3.sources.s3.type = avro
a3.sources.s3.bind = 192.168.134.101
a3.sources.s3.port = 4545


#定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 定義sink模塊的類型及相關屬性
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-active/%Y%m%d/

#啓用根據時間生成轉義字符的值及自動回滾生產日期目錄
a3.sinks.k3.hdfs.round = true
#使用本地系統時間戳做爲基準進行日期回滾  
a3.sinks.k3.hdfs.useLocalTimeStamp = true

#設置文件的前綴,若是不設置則默認值爲FlumeData
a3.sinks.k3.hdfs.filePrefix = HiveLog


#設置解決文件過多太小問題,將每一個文件的大小控制在128M
#將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據
#rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息
a3.sinks.k3.hdfs.rollInterval = 0
a3.sinks.k3.hdfs.rollSize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a3.sinks.k3.hdfs.minBlockReplicas = 1


#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程
#假如batchSize=200,transactionCapacity=100
#系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException 
a3.sinks.k3.hdfs.batchSize = 100

# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a3.sinks.k3.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a3.sinks.k3.hdfs.writeFormat = Text


# 將source和sink模塊綁定到對應的channel管道上 
a3.sources.s3.channels = c3
a3.sinks.k3.channel = c3

avro2hdfs-failover.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
a4.sources = s4
a4.sinks = k4
a4.channels = c4

# 定義source模塊的類型及此類型相關的其餘屬性
a4.sources.s4.type = avro
a4.sources.s4.bind = 192.168.134.101
a4.sources.s4.port = 4546

#定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

# 定義sink模塊的類型及相關屬性
a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-failover/%Y%m%d/

#啓用根據時間生成轉義字符的值及自動回滾生產日期目錄
a4.sinks.k4.hdfs.round = true
#使用本地系統時間戳做爲基準進行日期回滾  
a4.sinks.k4.hdfs.useLocalTimeStamp = true

#設置文件的前綴,若是不設置則默認值爲FlumeData
a4.sinks.k4.hdfs.filePrefix = HiveLog


#設置解決文件過多太小問題,將每一個文件的大小控制在128M
#將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據
#rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息
a4.sinks.k4.hdfs.rollInterval = 0
a4.sinks.k4.hdfs.rollSize = 128000000
a4.sinks.k4.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a4.sinks.k4.hdfs.minBlockReplicas = 1


#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程
#假如batchSize=200,transactionCapacity=100
#系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException 

a4.sinks.k4.hdfs.batchSize = 100

# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a4.sinks.k4.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a4.sinks.k4.hdfs.writeFormat = Text


# 將source和sink模塊綁定到對應的channel管道上 
a4.sources.s4.channels = c4
a4.sinks.k4.channel = c4

Load balancing Sink Processor

能夠配置同屬一個組的多個sink之間負載平衡的能力
支持經過round_robin(輪詢)或者random(隨機)參數來實現事件的分發
默認狀況下使用round_robin,也能夠自定義分發機制 
一般是多個sink綁定在同一個channel上

nginx2avro-balance.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
#管道留c1便可!!!!!!!!!!!!!!!!!!
a2.sources = s2
a2.sinks = k1 k2
a2.channels = c1

#拷貝修改!!!!!!!!!!!!!!!!!!
## Load balancing Sink Processor
#聲明一個由k1 k2組成的組,組名稱爲g1
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
#是否以剔除失敗的Sinks。
a1.sinkgroups.g1.processor.backoff = true
#採用何種負載均衡算法,round_robin, random或自定義
a1.sinkgroups.g1.processor.selector = round_robin

# 定義source模塊的類型及此類型相關的其餘屬性
a2.sources.s2.type = exec
a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log  
a2.sources.s2.shell = /bin/sh -c

#定義c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# 定義k1
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = 192.168.134.101
a2.sinks.k1.port = 4546

# 定義k2
a2.sinks.k2.type = avro
a2.sinks.k2.hostname = 192.168.134.101
a2.sinks.k2.port = 4545

# 將source和sink模塊綁定到對應的channel管道上 
# 修改c2爲c1!!!!!!!!!!!!!!!!!!
a2.sources.s2.channels = c1
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c1

avro2hdfs-01.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
a3.sources = s3
a3.sinks = k3
a3.channels = c3


# 定義source模塊的類型及此類型相關的其餘屬性
a3.sources.s3.type = avro
a3.sources.s3.bind = 192.168.134.101
a3.sources.s3.port = 4545


#定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100


# 定義sink模塊的類型及相關屬性
#修改路徑!!!!!!!!!!!!!!!!!!
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume-01/%Y%m%d/

#啓用根據時間生成轉義字符的值及自動回滾生產日期目錄
a3.sinks.k3.hdfs.round = true
#使用本地系統時間戳做爲基準進行日期回滾  
a3.sinks.k3.hdfs.useLocalTimeStamp = true

#設置文件的前綴,若是不設置則默認值爲FlumeData
a3.sinks.k3.hdfs.filePrefix = HiveLog


#設置解決文件過多太小問題,將每一個文件的大小控制在128M
#將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據
#rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息
a3.sinks.k3.hdfs.rollInterval = 0
a3.sinks.k3.hdfs.rollSize = 128000000
a3.sinks.k3.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a3.sinks.k3.hdfs.minBlockReplicas = 1


#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程
#假如batchSize=200,transactionCapacity=100
#系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException 

a3.sinks.k3.hdfs.batchSize = 100

# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a3.sinks.k3.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a3.sinks.k3.hdfs.writeFormat = Text


# 將source和sink模塊綁定到對應的channel管道上 
a3.sources.s3.channels = c3
a3.sinks.k3.channel = c3

avro2hdfs-02.properties

#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名
a4.sources = s4
a4.sinks = k4
a4.channels = c4

# 定義source模塊的類型及此類型相關的其餘屬性
a4.sources.s4.type = avro
a4.sources.s4.bind = 192.168.134.101
a4.sources.s4.port = 4546

#定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100

# 定義sink模塊的類型及相關屬性
#修改路徑!!!!!!!!!!!!!!!!!!
a4.sinks.k4.type = hdfs
a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume02/%Y%m%d/

#啓用根據時間生成轉義字符的值及自動回滾生產日期目錄
a4.sinks.k4.hdfs.round = true
#使用本地系統時間戳做爲基準進行日期回滾  
a4.sinks.k4.hdfs.useLocalTimeStamp = true

#設置文件的前綴,若是不設置則默認值爲FlumeData
a4.sinks.k4.hdfs.filePrefix = HiveLog


#設置解決文件過多太小問題,將每一個文件的大小控制在128M
#將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據
#rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息
a4.sinks.k4.hdfs.rollInterval = 0
a4.sinks.k4.hdfs.rollSize = 128000000
a4.sinks.k4.hdfs.rollCount = 0
#寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效
a4.sinks.k4.hdfs.minBlockReplicas = 1


#批量寫入到hdfs上文件中的最大event數量
#batchSize的值須要小於等於transactionCapacity的值 
#hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程
#假如batchSize=200,transactionCapacity=100
#系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException 

a4.sinks.k4.hdfs.batchSize = 100

# fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile
a4.sinks.k4.hdfs.fileType = DataStream
# 寫入到hdfs上的文件的格式(序列化方法) 
# 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 
a4.sinks.k4.hdfs.writeFormat = Text


# 將source和sink模塊綁定到對應的channel管道上 
a4.sources.s4.channels = c4
a4.sinks.k4.channel = c4
相關文章
相關標籤/搜索