cloudera 公司開源的,貢獻給Apache基金會 html
http://archive.cloudera.com/c...node
只能運行在linux系統上mysql
Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.linux
flume用來高效的收集、聚合、移動大量的日誌數據 nginx
有一個基於流式的簡單的有彈性的傳輸模型 web
有一個健壯的可容錯的機制 正則表達式
使用簡單,能夠擴展的數據模型運行使用到在線實時分析應用中 redis
簡單體如今flume-agent的配置及傳輸模型簡單 算法
在線實時分析應用中
flume日誌的實時採集->sparkStreaming/storm/Flink =>mysql/redis=>實時分析的結果進行報表展現
數據(日誌)的移動傳輸工具:
日誌=>系統運行日誌、web服務器的訪問日誌、客戶端的用戶行爲日誌、軟件的運行操做日誌
能夠將數據從數據源中採集並移動到另一個目的地:
數據源=>系統本地日誌文件中的數據、jms、avro端口、kafka、系統本地目錄下... 目的地=>hdfs、hive、hbase、kafka、系統本地一個文件中...
腳本+hdfs命令 =>【週期性】上傳
#!/bin/sh HADOOP_HOME=/opt/cdh-5.14.2/hadoop-2.6.0-cdh5.14.2 $HADOOP_HOME/bin/hdfs -put /.../xx.log /hdfs
針對項目初期數據量較少時可使用 , 沒有容災性及穩定性
採用flume日誌採集框架=>【實時】採集一個日誌文件中實時追加的日誌數據並寫入到目的地
針對不一樣的應用場景定義並啓動對應的flume-agent實例/進程 source -- 定義從哪裏採集數據 exec類型的source能夠藉助Linux的shell命令實現實時讀取一個日誌文件中動態追加的日誌數據 avro類型 …… channel -- 定義了source採集的數據臨時存儲地 memory 基於內存的管道容器 file 基於磁盤 sink -- 定義將數據最終寫入的-目的地 hdfs類型的sink將數據最終寫入到hdfs上 hive類型將數據最終寫入到hive表 kafka類型將數據最終寫入到kafka分佈式消息隊列中 ……
每一個flume-agent實例至少由如下三個功能模塊組成 source模塊 用於監控數據源並進行數據的實時採集,是實時產生數據流的模塊 數據源=>系統本地的一個日誌文件中、kafka、jms、系統本地的一個目錄下、avro端口 。。。 source將採集到的數據提交到channel中 channel模塊 用於鏈接source和sink的管道容器 相似一個隊列(FIFO) sink模塊 從channel中拉取take(剪切)數據並最終將數據寫入到目的地 目的地=>hdfs、hive、hbase、kafka、avro端口... event事件: event事件是flume傳輸日誌數據時基本單元,在flume-agent內部數據都是以事件形式存在 source將採集到的數據封裝成一個個的event事件,將事件提交到channel sink從channel消費事件並將事件中封裝的數據最終寫入到目的地 event事件的數據結構:header + body header 是一個map集合類型 內部的key-value爲該事件的元數據信息,主要用來區分不一樣的事件 body 是一個字節數組類型 body爲咱們真正要傳輸的數據
flume-ng-1.6.0-cdh5.14.2 安裝 一、上次解壓flume的安裝包 $ tar zxvf /opt/softwares/flume-ng-1.6.0-cdh5.14.2.tar.gz -C /opt/cdh-5.14.2/ $ mv apache-flume-1.6.0-cdh5.14.2-bin/ flume-1.6.0-cdh5.14.2 修改目錄名稱-可選 二、修改flume配置文件 $ mv conf/flume-env.sh.template conf/flume-env.sh 修改後環境配置文件才能生效 $ vi conf/flume-env.sh export JAVA_HOME=/opt/cdh-5.14.2/jdk1.8.0_112 三、針對不一樣的場景需求配置對應的java屬性配置文件並啓動flume-agent進程 如何啓動一個flume-agent進程 $ bin/flume-ng agent \ --name或-n 當前flume-agent實例的別名 \ --conf或-c 當前flume框架的配置文件目錄 \ --conf-file,-f 與當前要啓動的flume-agent進程相匹配的java屬性配置文件的本地路徑 Usage: bin/flume-ng <command> [options]...
定義一個flume-agent去監聽讀取某臺服務器上的某個端口中的數據,並將監聽讀取到的數據最終寫入到flume框架本身的日誌文件中
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = centos01 a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
提交測試: $ bin/flume-ng agent -n a1 -c conf/ -f conf/netcat2logger.properties & 肯定目標服務器的端口是否已經成功被flume-agent代理進程監聽 $ netstat -antp |grep 44444 --查看端口信息 $ ps -ef | grep flume -- 查看進程信息 安裝一個telnet工具並鏈接服務器端口寫入數據 $ sudo yum -y install telnet 發送消息數據 檢查flume的日誌文件中的數據 $ tail -f logs/flume.log
要求使用flume實時監控讀取系統本地一個日誌文件中動態追加的日誌數據並實時寫入到hdfs上的某個目錄下
# example.conf: A single-node Flume configuration #同一臺Linux上可開啓多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令讀取本地文件,Linux的命令不中止flume就不停 a2.sources.r2.type = exec # tail -F 文件名 即便沒有這個-F後面指定的文件,命令也不會中止,容錯能力強 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #聲明a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog a2.sinks.k2.hdfs.filePrefix = nginxData # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
報錯首先找logs目錄
報錯:找不到類
缺乏jar包
[beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/hadoop-common-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/commons-configuration-1.6.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/ [beifeng@bigdata01 hadoop-2.6.0-cdh5.14.2]$ cp share/hadoop/tools/lib/hadoop-auth-2.6.0-cdh5.14.2.jar /opt/cdh-5.14.2/flume-1.6.0-cdh5.14.2/lib/
解決生成的文件過多太小的問題(但願文件的大小=128M)
將日誌文件按照日期分目錄存儲(按照天分目錄存儲)
將生成的日誌文件的格式改成Text文本格式
修改上個例子的flume-agent屬性文件
# 聲明當前flume-agent的別名及當前的flume-agent實例包含的模塊的別名和個數 a2.sources = s2 a2.channels = c2 a2.sinks = k2 # 定義source模塊中的s2的類型及與此類型相關的延伸屬性 # exec類型的source能夠藉助執行一條linux shell命令實現讀取linux系統上某個文件中的日誌數據,其中 cat是一次性讀取,tail能夠實現實時讀取新增長的數據 # shell屬性用來聲明要執行的命令的運行環境 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /opt/nginx/access.log a2.sources.s2.shell = /bin/sh -c # 定義channel模塊中的c2的類型及與此類型相關的延伸屬性 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 定義sink模塊中的k2的類型及與此類型相關的延伸屬性 a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume-demo2/%Y%m%d #啓用根據時間生成路徑中的轉義字符的具體的時間值 a2.sinks.k2.hdfs.round = true #表示使用本地linux系統時間戳做爲時間基準,不然會自動參考事件的header中的時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #設置文件的前綴 a2.sinks.k2.hdfs.filePrefix = NgnixLog #設置解決文件過多太小問題 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #從性能上考慮,最優的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a2.sinks.k2.hdfs.writeFormat = Text # 將a2中的source及sink模塊綁定到對應的channel模塊上 # 一個source模塊能夠同時綁定多個channel模塊,可是一個sink模塊只能綁定一個惟一的channel a2.sources.s2.channels = c2 a2.sinks.k2.channel = c2
利用flume監控某個目錄下的日誌文件,當某個目錄下出現符合要求的文件名稱的文件時,則對文件中的日誌數據進行讀取,並將數據最終寫入到hdfs上
目錄 /opt/data/logs nginx-access.log.2018120309 nginx-access.log.2018120310
# example.conf: A single-node Flume configuration #同一臺Linux上可開啓多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source # includePattern 用正則表達式指定要包含的文件 # ignorePattern 用正則表達式指定要忽略的文件 a2.sources.r2.type = spooldir a2.sources.r2.spoolDir = /home/chen/mylogs # 因爲每次讀完會給讀完的文件增長.COMPLETED從而造成新文件,須要忽略這些文件 a2.sources.r2.ignorePattern = ^.*\.COMPLETED$ # includePattern和ignorePattern會同時生效 a2.sources.r2.includePattern = ^.*$ # Use a channel # file類型更安全 # memory類型效率更高 a2.channels.c2.type = file a2.channels.c2.dataDirs = /opt/modules/flume-1.6.0-cdh5.14.2/data #聲明a2的sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/weblog/%y%m%d #啓用根據時間生成轉義字符的具體的時間值 a2.sinks.k2.hdfs.round = true #使用本地linux系統時間戳做爲時間基準,不然會自動參考事件的header中的時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.filePrefix = nginxData #設置解決文件過多太小問題 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #從性能上考慮,最優的是batchSize=transactionCapacity a2.sinks.k2.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a2.sinks.k2.hdfs.writeFormat = Text # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
需求: Nginx服務器集羣 -- 10臺 每臺服務器上都有一個access.log日誌文件 須要將每臺服務器上的日誌文件中追加的日誌數據實時讀取並寫入到hdfs上 思路1: 每臺Nginx服務器上啓動一個flume-agent source - exec channel - mem sink - hdfs 多個flume-agent同時寫入數據到hfds上不利於hdfs的穩定性 思路2: 每臺Nginx服務器上啓動一個flume-agent source - exec channel - mem sink - avro type = avro hostname = 主機名 port = 端口號 將數據統一寫入到某臺服務器某個端口中 啓動一個負責對彙總後的數據統一寫入到目的地的flum-agent source - avro type = avro bind = port = channel - mem sink - hdfs
nginxs2flume.properties
# example.conf: A single-node Flume configuration #同一臺Linux上可開啓多個flume-agent,但agent別名要區分 a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source #依靠的是Linux的命令讀取本地文件,Linux的命令不中止flume就不停 a2.sources.r2.type = exec # tail -F 文件名 即便沒有這個-F後面指定的文件,命令也不會中止,容錯能力強 a2.sources.r2.command = tail -F /home/chen/Documents/nginx.log # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 #聲明a2的sink a2.sinks.k2.type = avro a2.sinks.k2.hostname = centos01 a2.sinks.k2.port = 6666 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
flume2hdfs.properties
# example.conf: A single-node Flume configuration #同一臺Linux上可開啓多個flume-agent,但agent別名要區分 a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = avro a3.sources.r3.bind = centos01 a3.sources.r3.port = 6666 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 #聲明a3的sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://centos01:8020/flume/weblog/test a3.sinks.k3.hdfs.writeFormat = Text # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
做用:
當一個flume-agent中存在多個channel時,爲Source選擇下游的Channel進行配置
source將事件複製並提交給全部與其綁定的channel中 默認屬性能夠省略定義
flume-agent source s1 -- 但願採集用戶日誌複製份數爲n(n=channel的個數)分配到各個channel中 channel c1 c2 sink k1 -- hdfs k2 -- kafka的某個主題中
# 聲明agent名稱及此agent要使用的source、channel、sink名稱 a2.sources = s2 a2.channels = c1 c2 a2.sinks = k1 k2 # 定義source類型及相關屬性,須要聲明下運行該命令的shell環境 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log # 默認屬性就是replicating a2.sources.s2.selector.type = replicating # 定義c1 # 定義channel類型及相關屬性 a2.channels.c1.type = memory # 管道的最大容量 a2.channels.c1.capacity = 1000 # 每次最大從source獲取的事件數量和sink每次獲取的event最大數量 a2.channels.c1.transactionCapacity = 100 # 定義c2 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # k1 # 定義kafka的sink a2.sinks.k1.type = org.apache.flume.source.kafka.KafkaSource a2.sinks.k1.kafka.bootstrap.servers = centos01:9092 a2.sinks.k1.kafka.topics = nginxTopic # k2 # 定義sinks類型及相關屬性 a2.sinks.k2.type = hdfs #設置按天進行生成存儲目錄,天天生成一個文件夾 a2.sinks.k2.hdfs.path = hdfs://centos01:8020/flume/nginx/%Y%m%d/ #啓用根據時間生成轉義字符的值 a2.sinks.k2.hdfs.round = true #使用本地時間戳做爲基準 a2.sinks.k2.hdfs.useLocalTimeStamp = true #設置文件的前綴 a2.sinks.k2.hdfs.filePrefix = flumeLog #設置解決文件過多太小問題 a2.sinks.k2.hdfs.rollInterval = 0 a2.sinks.k2.hdfs.rollSize = 128000000 a2.sinks.k2.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a2.sinks.k2.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 a2.sinks.k2.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a2.sinks.k2.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a2.sinks.k2.hdfs.writeFormat = Text # 綁定sink和source到channel a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2 a2.sources.s2.channels = c1 c2
source根據事件的header頭信息中的不一樣值提交給對應的channel中 事件中自帶頭信息 source直接判斷便可 https://blog.csdn.net/looklook5/article/details/40430965--官方案例 事件沒有頭信息 須要自定義header頭信息,又稱爲定義攔截器 http://www.cnblogs.com/Skyar/p/5831935.html --案例 使用正則匹配選擇器類型對日誌分解並指定key名稱 (\\w+)表示任意單詞字符串,範圍包括a-zA-Z0-9_ file_roll類型本地目錄提早建立,默認目錄下每30s回滾一個文件 使用echo "192.168.134.101:bigdata01:spark" >> test.log
mul_demo.properties
#聲明代理及三種模塊的別名及數量 a1.sources = s1 a1.channels = c1 c2 c3 c4 a1.sinks = k1 k2 k3 k4 # 定義s1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /opt/flume/test.log # 聲明該source的channel selector類型 # multiplexing類型的channel selector會根據header中的某個key的值的不一樣提交給不一樣的channel a1.sources.s1.selector.type = multiplexing # 聲明依據header頭的key爲course的值來判斷提交該條數據給那些channel # header map( k1->v1,k2->v2 ,k3->v3 ) a1.sources.s1.selector.header = course # 若是該條數據中的header中的course的值爲hadoop,則將該事件提交給c1 a1.sources.s1.selector.mapping.hadoop = c1 a1.sources.s1.selector.mapping.hive = c1 c2 a1.sources.s1.selector.mapping.hbase = c3 # 若是以上都未匹配到則默認提交給c4 a1.sources.s1.selector.default = c4 # 聲明一個source的攔截器組件,別名叫 i1 a1.sources.s1.interceptors = i1 # 聲明此攔截器的類型 ,regex_extractor爲正則匹配攔截器類型 a1.sources.s1.interceptors.i1.type = regex_extractor # (\\w+) 表示能夠匹配任意的字符串 範圍包括 a-zA-Z0-9_ # abc:erdfd:gsaafdf # 將一條日誌經過':'的分割並聲明對應的key值,最終定義出一個包含三個key-value對的header頭信息 # 192.168.134.101:bigdata01:spark => # header{ip->192.168.134.101,domain->bigdata01,course->spark} body (192.168.134.101:bigdata01:spark ) a1.sources.s1.interceptors.i1.regex = (\\w+):(\\w+):(\\w+) a1.sources.s1.interceptors.i1.serializers = s1 s2 s3 a1.sources.s1.interceptors.i1.serializers.s1.name = ip a1.sources.s1.interceptors.i1.serializers.s2.name = domain a1.sources.s1.interceptors.i1.serializers.s3.name = course # 定義 c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 定義 c2 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 定義 c3 a1.channels.c3.type = memory a1.channels.c3.capacity = 1000 a1.channels.c3.transactionCapacity = 100 # 定義 c4 a1.channels.c4.type = memory a1.channels.c4.capacity = 1000 a1.channels.c4.transactionCapacity = 100 # 定義k1 #file_roll類型爲將事件寫入到linux本地磁盤 a1.sinks.k1.type = file_roll #定義將事件寫入到本地的目錄 a1.sinks.k1.sink.directory = /opt/flume/k1 a1.sinks.k1.sink.rollInterval = 0 # 定義k2 a1.sinks.k2.type = file_roll a1.sinks.k2.sink.directory = /opt/flume/k2 a1.sinks.k2.sink.rollInterval = 0 # 定義k3 a1.sinks.k3.type = file_roll a1.sinks.k3.sink.directory = /opt/flume/k3 a1.sinks.k3.sink.rollInterval = 0 # 定義k4 a1.sinks.k4.type = file_roll a1.sinks.k4.sink.directory = /opt/flume/k4 a1.sinks.k4.sink.rollInterval = 0 # Bind the source and sink to the channel a1.sources.s1.channels = c1 c2 c3 c4 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2 a1.sinks.k3.channel = c3 a1.sinks.k4.channel = c4 # 最終能夠實現的需求 若是source採集的日誌爲 192.168.134.101:bigdata01:spark =》日誌發送給 c4 192.168.134.101:bigdata01:hive =》日誌發送給 c1 c2 192.168.134.106:bigdata04:hbase =》日誌發送給 c3 192.168.134.106:bigdata04:hadoop =》日誌發送給 c1 192.168.134.106:bigdata04:oozie =》日誌發送給 c4
做用: 將一個flume-agent中的多個sink定義到一個Sinkgroups組中 使組內多sink之間實現故障轉移或負載均衡 類型:
一個組內只有一個sink,不強制用戶爲Sink建立Processor 就是以前的單sink案例
經過配置維護了多個sink組成的優先級列表 須要爲全部的sink分配優先級,全部的優先級數字必須是惟一的 數字越大表示優先獲取channel數據 搭配 Replicating Channel Selector 使用
nginx2avro.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 a2.sources = s2 a2.channels = c1 c2 a2.sinks = k1 k2 ## Failover Sink Processor #聲明一個由k1 k2組成的組,組名稱爲g1 a2.sinkgroups = g1 a2.sinkgroups.g1.sinks = k1 k2 #經過如下定義能夠聲明k1和k2的關係時故障轉移關係(ha) a2.sinkgroups.g1.processor.type = failover #配置各個sink的優先級,只要有大小差異就行 a2.sinkgroups.g1.processor.priority.k1 = 10 a2.sinkgroups.g1.processor.priority.k2 = 5 #聲明failover前最大等待時間,默認10s a2.sinkgroups.g1.processor.maxpenalty = 10000 # 定義source模塊的類型及此類型相關的其餘屬性 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log a2.sources.s2.shell = /bin/sh -c #定義c1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 #定義c2 a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 定義k1 # 注意:avro的歸屬服務器要在不一樣節點上,不然失去HA意義,端口號由於不在同一臺服務器上因此能夠相同 # 由於目前是僞分佈,因此服務器只能同樣而且端口號要區分開 a2.sinks.k1.type = avro a2.sinks.k1.hostname = 192.168.134.101 a2.sinks.k1.port = 4545 # 定義k2 a2.sinks.k2.type = avro a2.sinks.k2.hostname = 192.168.134.101 a2.sinks.k2.port = 4546 # 將source和sink模塊綁定到對應的channel管道上 a2.sources.s2.channels = c1 c2 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c2
avro2hdfs-active.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 a3.sources = s3 a3.sinks = k3 a3.channels = c3 # 定義source模塊的類型及此類型相關的其餘屬性 a3.sources.s3.type = avro a3.sources.s3.bind = 192.168.134.101 a3.sources.s3.port = 4545 #定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 定義sink模塊的類型及相關屬性 a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-active/%Y%m%d/ #啓用根據時間生成轉義字符的值及自動回滾生產日期目錄 a3.sinks.k3.hdfs.round = true #使用本地系統時間戳做爲基準進行日期回滾 a3.sinks.k3.hdfs.useLocalTimeStamp = true #設置文件的前綴,若是不設置則默認值爲FlumeData a3.sinks.k3.hdfs.filePrefix = HiveLog #設置解決文件過多太小問題,將每一個文件的大小控制在128M #將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據 #rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息 a3.sinks.k3.hdfs.rollInterval = 0 a3.sinks.k3.hdfs.rollSize = 128000000 a3.sinks.k3.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a3.sinks.k3.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程 #假如batchSize=200,transactionCapacity=100 #系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException a3.sinks.k3.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a3.sinks.k3.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a3.sinks.k3.hdfs.writeFormat = Text # 將source和sink模塊綁定到對應的channel管道上 a3.sources.s3.channels = c3 a3.sinks.k3.channel = c3
avro2hdfs-failover.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 a4.sources = s4 a4.sinks = k4 a4.channels = c4 # 定義source模塊的類型及此類型相關的其餘屬性 a4.sources.s4.type = avro a4.sources.s4.bind = 192.168.134.101 a4.sources.s4.port = 4546 #定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 a4.channels.c4.type = memory a4.channels.c4.capacity = 1000 a4.channels.c4.transactionCapacity = 100 # 定義sink模塊的類型及相關屬性 a4.sinks.k4.type = hdfs a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/flume-collector-failover/%Y%m%d/ #啓用根據時間生成轉義字符的值及自動回滾生產日期目錄 a4.sinks.k4.hdfs.round = true #使用本地系統時間戳做爲基準進行日期回滾 a4.sinks.k4.hdfs.useLocalTimeStamp = true #設置文件的前綴,若是不設置則默認值爲FlumeData a4.sinks.k4.hdfs.filePrefix = HiveLog #設置解決文件過多太小問題,將每一個文件的大小控制在128M #將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據 #rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息 a4.sinks.k4.hdfs.rollInterval = 0 a4.sinks.k4.hdfs.rollSize = 128000000 a4.sinks.k4.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a4.sinks.k4.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程 #假如batchSize=200,transactionCapacity=100 #系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException a4.sinks.k4.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a4.sinks.k4.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a4.sinks.k4.hdfs.writeFormat = Text # 將source和sink模塊綁定到對應的channel管道上 a4.sources.s4.channels = c4 a4.sinks.k4.channel = c4
能夠配置同屬一個組的多個sink之間負載平衡的能力 支持經過round_robin(輪詢)或者random(隨機)參數來實現事件的分發 默認狀況下使用round_robin,也能夠自定義分發機制 一般是多個sink綁定在同一個channel上
nginx2avro-balance.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 #管道留c1便可!!!!!!!!!!!!!!!!!! a2.sources = s2 a2.sinks = k1 k2 a2.channels = c1 #拷貝修改!!!!!!!!!!!!!!!!!! ## Load balancing Sink Processor #聲明一個由k1 k2組成的組,組名稱爲g1 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance #是否以剔除失敗的Sinks。 a1.sinkgroups.g1.processor.backoff = true #採用何種負載均衡算法,round_robin, random或自定義 a1.sinkgroups.g1.processor.selector = round_robin # 定義source模塊的類型及此類型相關的其餘屬性 a2.sources.s2.type = exec a2.sources.s2.command = tail -F /usr/local/nginx/datalog/access.log a2.sources.s2.shell = /bin/sh -c #定義c1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 定義k1 a2.sinks.k1.type = avro a2.sinks.k1.hostname = 192.168.134.101 a2.sinks.k1.port = 4546 # 定義k2 a2.sinks.k2.type = avro a2.sinks.k2.hostname = 192.168.134.101 a2.sinks.k2.port = 4545 # 將source和sink模塊綁定到對應的channel管道上 # 修改c2爲c1!!!!!!!!!!!!!!!!!! a2.sources.s2.channels = c1 a2.sinks.k1.channel = c1 a2.sinks.k2.channel = c1
avro2hdfs-01.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 a3.sources = s3 a3.sinks = k3 a3.channels = c3 # 定義source模塊的類型及此類型相關的其餘屬性 a3.sources.s3.type = avro a3.sources.s3.bind = 192.168.134.101 a3.sources.s3.port = 4545 #定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 定義sink模塊的類型及相關屬性 #修改路徑!!!!!!!!!!!!!!!!!! a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume-01/%Y%m%d/ #啓用根據時間生成轉義字符的值及自動回滾生產日期目錄 a3.sinks.k3.hdfs.round = true #使用本地系統時間戳做爲基準進行日期回滾 a3.sinks.k3.hdfs.useLocalTimeStamp = true #設置文件的前綴,若是不設置則默認值爲FlumeData a3.sinks.k3.hdfs.filePrefix = HiveLog #設置解決文件過多太小問題,將每一個文件的大小控制在128M #將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據 #rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息 a3.sinks.k3.hdfs.rollInterval = 0 a3.sinks.k3.hdfs.rollSize = 128000000 a3.sinks.k3.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a3.sinks.k3.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程 #假如batchSize=200,transactionCapacity=100 #系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException a3.sinks.k3.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a3.sinks.k3.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a3.sinks.k3.hdfs.writeFormat = Text # 將source和sink模塊綁定到對應的channel管道上 a3.sources.s3.channels = c3 a3.sinks.k3.channel = c3
avro2hdfs-02.properties
#以及聲明當前的flume-agent應用實例實例中三種模塊的數量及別名 a4.sources = s4 a4.sinks = k4 a4.channels = c4 # 定義source模塊的類型及此類型相關的其餘屬性 a4.sources.s4.type = avro a4.sources.s4.bind = 192.168.134.101 a4.sources.s4.port = 4546 #定義當前flume-agent應用實例中的channel模塊的類型及此類型相關的其餘屬性 a4.channels.c4.type = memory a4.channels.c4.capacity = 1000 a4.channels.c4.transactionCapacity = 100 # 定義sink模塊的類型及相關屬性 #修改路徑!!!!!!!!!!!!!!!!!! a4.sinks.k4.type = hdfs a4.sinks.k4.hdfs.path = hdfs://192.168.134.101:8020/nginx-flume02/%Y%m%d/ #啓用根據時間生成轉義字符的值及自動回滾生產日期目錄 a4.sinks.k4.hdfs.round = true #使用本地系統時間戳做爲基準進行日期回滾 a4.sinks.k4.hdfs.useLocalTimeStamp = true #設置文件的前綴,若是不設置則默認值爲FlumeData a4.sinks.k4.hdfs.filePrefix = HiveLog #設置解決文件過多太小問題,將每一個文件的大小控制在128M #將rollInterval和rollCount屬性的值改成0後文件的回滾將再也不由時間間隔及事件的數量爲依據 #rollSize的值須要是一個127M左右的值,由於每一個文件是一個block塊,每一個block塊都還包含元數據信息 a4.sinks.k4.hdfs.rollInterval = 0 a4.sinks.k4.hdfs.rollSize = 128000000 a4.sinks.k4.hdfs.rollCount = 0 #寫入到hdfs的最小副本數,不設置會致使上面的三個參數不生效 a4.sinks.k4.hdfs.minBlockReplicas = 1 #批量寫入到hdfs上文件中的最大event數量 #batchSize的值須要小於等於transactionCapacity的值 #hdfs類型的sink將數據寫入到hdsf上的底層源碼執行過程 #假如batchSize=200,transactionCapacity=100 #系統建立一個容量爲100個event的臨時隊列容器(transactionCapacity)-》sink會最多取出200(batchSize)個event塞到transactionCapacity容器中-》由於transactionCapacity的容量不夠會報錯channelException a4.sinks.k4.hdfs.batchSize = 100 # fileType定義的是數據流的格式,默認的數據流的格式爲SequenceFile a4.sinks.k4.hdfs.fileType = DataStream # 寫入到hdfs上的文件的格式(序列化方法) # 格式改成text後,能夠經過cat 或 text 命令查看文件中的日誌內容 a4.sinks.k4.hdfs.writeFormat = Text # 將source和sink模塊綁定到對應的channel管道上 a4.sources.s4.channels = c4 a4.sinks.k4.channel = c4