Flume
概述Flume
是Cloudera
提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統;php
Flume
基於流式架構,靈活簡單。java
能夠和任意存儲進程集成linux
輸入的的數據速率大於寫入目的存儲的速率,Flume
會進行緩衝,減少HDFS
的壓力web
Flume
中的事務基於Channel
,使用了兩個事務模型(sender
+ receiver
),確保消息被可靠發送shell
Flume
使用兩個獨立的事務分別負責從Soucrce
到Channel
,以及從Channel
到Sink
的事件傳遞。一旦事務中全部的數據所有成功提交到Channel
,那麼Source
才認爲該數據讀取完成,同理,只有成功被Sink
寫出去的數據,纔會從Channel
中移除apache
Agent
Agent
是一個JVM
進程,它以事件的形式將數據從源頭傳遞到目的地json
Agent
主要由Source
、Channel
、Sink
組成vim
Source
Source
是負責接收數據到Agent
的組件,能夠處理各類類型,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
瀏覽器
Channel
Channel
是位於Source
和Sink
之間的緩衝區,所以,Channel
容許Source
和Sink
運做在不一樣的速率上,Channel
是線程安全的,能夠同時處理幾個Source
的寫入操做和幾個Sink
的讀取操做。安全
Flume
自帶兩種Channel
:
Memory Channel
:內存中的隊列速度快,適合在不須要關係數據丟失的情境下使用
File Channel
:將全部事件寫入磁盤,所以在程序關閉或機器宕機的狀況下不會丟失數據
Sink
Sink
不斷地輪詢Channel
中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent
。
Sink
是徹底事務性的,在從Channel
批量刪除數據以前,每一個Sink
用Channel
啓動一個事務,批量事件一旦成功寫出到存儲系統或下一個Flume Agent
,Sink
就利用Channel
提交事務,事務一旦被提交,該Channel
從本身的內部緩衝區刪除事件。
Sink
組件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、null
、HBase
、solr
、自定義。
Event
傳輸單元,Flume
數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。
Event
由可選的header
和載有數據的一個byte array
構成,Header
是容納了key-value
字符串對的HashMap
。
一般一條數據就是一個 Event
,每2048
個字節劃分一個Event
。
這種模式是將多個Flume
給順序鏈接起來了,從最初的Source
開始到最終Sink
傳送的目的存儲系統,此模式不建議橋接過多的Flume
數量, Flume
數量過多不只會影響傳輸速率,並且一旦傳輸過程當中某個節點Flume
宕機,會影響整個傳輸系統。
Flum
支持將事件流向一個或者多個目的地,這種模式將數據源複製到多個Channel
中,每一個Channel
都有相同的數據,Sink
能夠選擇傳送的不一樣的目的地。
Flume
支持使用將多個Sink
邏輯上分到一個Sink
組,Flume
將數據發送到不一樣的Sink
,主要解決負載均衡和故障轉移問題。
這種模式是咱們最多見的,也很是實用,平常web
應用一般分佈在上百個服務器,大者甚至上千個、上萬個服務器,產生的日誌,處理起來也很是麻煩,用Flume
的這種組合方式能很好的解決這一問題,每臺服務器部署一個Flume
採集日誌,傳送到一個集中收集日誌的Flume
,再由此Flume
上傳到 hdfs
、hive
、hbase
、jms
等進行日誌分析。
Agent
原理Flume
部署一、解壓apache-flume-1.7.0-bin.tar.gz
到/opt/module
目錄下
二、修改apache-flume-1.7.0-bi
的名稱爲flume
三、將flume/conf
下的flume-env.sh.template
文件修改成flume-env.sh
,並配置flume-env.sh
中的JAVA_HOME
需求分析:
服務端監聽本機44444
端口
服務端使用netcat
工具向44444
端口發送消息
最後將數據展現在控制檯上
實現步驟:
一、在job
文件夾下建立Agent
配置文件flume-netcat-logger.conf
[djm@hadoop102 job]$ vim flume-netcat-logger.conf
二、添加以下內容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
三、啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console
參數說明:
--conf conf/
表示配置文件存儲在conf/
目錄
--name a1
表示給 Agent 起名爲a1
--conf-file job/flume-netcat.conf Flume
本次啓動讀取的配置文件是在job
文件夾下的 flume-telnet.conf
文件
-Dflume.root.logger==INFO,console -D
表示Flume
運行時動態修改flume.root.logger
參數屬性值,並將控制檯日誌打印級別設置爲INFO
級別
HDFS
需求分析:
實時監控Hive
日誌,並上傳到HDFS
中
實現步驟:
一、在job
文件夾下建立Agent
配置文件flume-file-hdfs.conf
[djm@hadoop102 job]$ vim flume-file-hdfs.conf
二、添加以下內容:
# Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log a2.sources.r2.shell = /bin/bash -c # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H #上傳文件的前綴 a2.sinks.k2.hdfs.filePrefix = logs- #是否按照時間滾動文件夾 a2.sinks.k2.hdfs.round = true #多少時間單位建立一個新的文件夾 a2.sinks.k2.hdfs.roundValue = 1 #從新定義時間單位 a2.sinks.k2.hdfs.roundUnit = hour #是否使用本地時間戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a2.sinks.k2.hdfs.batchSize = 1000 #設置文件類型,可支持壓縮 a2.sinks.k2.hdfs.fileType = DataStream #多久生成一個新的文件 a2.sinks.k2.hdfs.rollInterval = 60 #設置每一個文件的滾動大小 a2.sinks.k2.hdfs.rollSize = 134217700 #文件的滾動與Event數量無關 a2.sinks.k2.hdfs.rollCount = 0 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2
三、啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf
注意:
要想讀取Linux
系統中的文件,就得按照Linux
命令的規則執行命令,因爲Hive
日誌在Linux
系統中因此讀取文件的類型選擇:exec
即execute
執行的意思。表示執行Linux
命令來讀取文件。
HDFS
需求分析:
使用Flume
監聽整個目錄的文件
實現步驟:
一、在job
文件夾下建立Agent
配置文件flume-dir-hdfs.conf
[djm@hadoop102 job]$ vim flume-dir-hdfs.conf
二、添加以下內容:
a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /opt/module/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true #忽略全部以.tmp結尾的文件,不上傳 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H #上傳文件的前綴 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照時間滾動文件夾 a3.sinks.k3.hdfs.round = true #多少時間單位建立一個新的文件夾 a3.sinks.k3.hdfs.roundValue = 1 #從新定義時間單位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地時間戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a3.sinks.k3.hdfs.batchSize = 100 #設置文件類型,可支持壓縮 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一個新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #設置每一個文件的滾動大小大概是128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滾動與Event數量無關 a3.sinks.k3.hdfs.rollCount = 0 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3
三、啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf
注意:
不要在監控目錄中建立並持續修改文件
需求分析:
使用Flume-1
監控文件變更,Flume-1
將變更內容傳遞給Flume-2
Flume-2
負責存儲到HDFS
同時Flume-1
將變更內容傳遞給Flume-3
,Flume-3
負責輸出到Local FileSystem
一、在group1
文件夾下建立Agent
配置文件flume-file-flume.conf
[djm@hadoop102 group1]$ vim flume-file-flume.conf
二、添加以下內容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 將數據流複製給全部channel a1.sources.r1.selector.type = replicating # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # Describe the sink # sink端的avro是一個數據發送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
三、在group1
文件夾下建立Agent
配置文件flume-flume-hdfs.conf
[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf
四、添加以下內容:
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source # source端的avro是一個數據接收服務 a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H #上傳文件的前綴 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照時間滾動文件夾 a2.sinks.k1.hdfs.round = true #多少時間單位建立一個新的文件夾 a2.sinks.k1.hdfs.roundValue = 1 #從新定義時間單位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地時間戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #積攢多少個Event才flush到HDFS一次 a2.sinks.k1.hdfs.batchSize = 100 #設置文件類型,可支持壓縮 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一個新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #設置每一個文件的滾動大小大概是128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滾動與Event數量無關 a2.sinks.k1.hdfs.rollCount = 0 # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
五、在group1
文件夾下建立 Agent 配置文件flume-flume-dir.conf
[djm@hadoop102 group1]$ vim flume-flume-dir.conf
六、添加以下內容:
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/data/flume3 # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
七、啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
注意:
Avro
是一種語言無關的數據序列化和RPC
框架
輸出的本地目錄必須是已經存在的目錄,若是該目錄不存在,並不會建立新的目錄
必須先啓動Sink
存在的job
Sink
組)需求分析:
使用Flume-1
監控端口數據,Flume-1
將變更內容傳遞給Flume-2
Flume-2
負責將數據展現在控制檯上
同時Flume-1
將變更內容傳遞給Flume-3
,Flume-3
也負責將數據展現在控制檯上
實現步驟:
一、在group2
文件夾下建立Agent
配置文件flume-netcat-flume.conf
二、添加以下內容:
# Name the components on this agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin a1.sinkgroups.g1.processor.selector.maxTimeOut=10000 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
三、在group2
文件夾下建立Agent
配置文件flume-flume-console1.conf
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # Describe the sink a2.sinks.k1.type = logger # Describe the channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
五、在 group2
文件夾下建立Agent
配置文件flume-flume-console2.conf
六、添加以下內容:
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2
七、啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
需求分析:
hadoop103
上的Flume-1
監控文件/opt/module/group.log
hadoop102
上的Flume-2
監控某一個端口的數據流
Flume-1
與Flume-2
將數據發送給hadoop104
上的Flume-3
,Flume-3
將最終數據打印到控制檯
實現步驟:
一、在group3
文件夾下建立Agent
配置文件flume1-logger-flume.conf
[djm@hadoop102 group3]$ vim flume1-logger-flume.conf
二、添加以下內容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/group.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
三、在group3
文件夾下建立Agent
配置文件flume2-netcat-flume.conf
[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf
四、添加以下內容:
# Name the components on this agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # Describe/configure the source a2.sources.r1.type = netcat a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 44444 # Describe the sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = hadoop104 a2.sinks.k1.port = 4141 # Use a channel which buffers events in memory a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
五、在group3
文件夾下建立Agent
配置文件flume3-flume-logger.conf
[djm@hadoop102 group3]$ vim flume3-flume-logger.conf
六、添加以下內容:
# Name the components on this agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 4141 # Describe the sink # Describe the sink a3.sinks.k1.type = logger # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
七、分發配置文件
[djm@hadoop102 group3]$ xsync /opt/module/flume/job
八、啓動任務
[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console [djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf [djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf
Ganglia
部署一、安裝httpd
服務與php
yum -y install httpd php
二、安裝其餘依賴
yum -y install rrdtool perl-rrdtool rrdtool-devel
三、安裝ganglia
rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm yum -y install ganglia-gmetad ganglia-gmond ganglia-web
四、修改ganglia
配置文件
vim /etc/httpd/conf.d/ganglia.conf
# # Ganglia monitoring system php web frontend # Alias /ganglia /usr/share/ganglia <Location /ganglia> # Require local Require all granted # Require ip 10.1.2.3 # Require host example.org </Location>
特別注意:如下配置是不能起做用的
<Location /ganglia> Order deny,allow Allow from all </Location>
五、修改gmetad
配置文件
vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102
六、修改gmond
配置文件
vim /etc/ganglia/gmond.conf
cluster { #name = "unspecified" name = "hadoop102" owner = "unspecified" latlong = "unspecified" url = "unspecified" } udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs. #mcast_join = 239.2.11.71 host = 192.168.10.102 port = 8649 ttl = 1 } /* You can specify as many udp_recv_channels as you like as well. */ udp_recv_channel { #mcast_join = 239.2.11.71 port = 8649 #bind = 239.2.11.71 bind = 192.168.10.102 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer = 10485760 }
六、查看SELinux
狀態
sestatus
若是不是disabled
,需修改如下配置文件:
vim /etc/selinux/config
或者臨時關閉SELinux
:
setenforce 0
七、啓動ganglia
systemctl start httpd systemctl start gmetad systemctl start gmond
八、打開瀏覽器訪問
http://hadoop102/ganglia/
若是完成以上操做仍出現權限不足錯誤,可修改/var/lib/ganglia
目錄的權限嘗試
chmod -R 777 /var/lib/ganglia
Source
需求分析:
編碼實現:
一、引入依賴
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency>
二、代碼編寫
package com.djm.flume; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { //定義配置文件未來要讀取的字段 private Long delay; private String field; /** * 接收數據,將數據封裝成一個個event,寫入channel * @return * @throws EventDeliveryException */ public Status process() throws EventDeliveryException { HashMap<String, String> hearderMap = new HashMap<>(); SimpleEvent event = new SimpleEvent(); try { for (int i = 0; i < 5; i++) { event.setHeaders(hearderMap); event.setBody((field + i).getBytes()); getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (InterruptedException e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } public long getBackOffSleepIncrement() { return 0; } public long getMaxBackOffSleepInterval() { return 0; } /** * 讀取配置文件 * @param context */ public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("field", "hello"); } }
三、打包測試
利用Maven
打包並上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下建立Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysource.conf
添加以下內容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.djm.flume.MySource a1.sources.r1.delay = 1000 a1.sources.r1.field = djm # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
Sink
需求分析:
編碼實現:
一、引入依賴
<dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.7.0</version> </dependency>
二、代碼編寫
package com.djm.flume; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public Status process() throws EventDeliveryException { Status status = null; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { Event event; transaction.begin(); while ((event = channel.take()) == null) { Thread.sleep(200); } LOG.info(prefix + new String(event.getBody()) + suffix); transaction.commit(); status = Status.READY; } catch (Throwable e) { transaction.rollback(); status = Status.BACKOFF; if (e instanceof Error) throw (Error) e; } finally { transaction.close(); } return status; } @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix"); } }
三、打包測試
利用Maven
打包並上傳到 /opt/module/flume/lib
目錄下
在job
文件夾下建立Agent
配置文件mysource.conf
[djm@hadoop102 job]$ vim mysink.conf
添加以下內容:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.djm.flume.MySink a1.sinks.k1.prefix = djm: a1.sinks.k1.suffix = :end # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
啓動任務
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
Flume
參數調優Source
增長Source
個數能夠增大Source
的讀取數據的能力,例如:當某一個目錄產生的文件過多時須要將這個文件目錄拆分紅多個文件目錄,同時配置好多個Source
以保證Source
有足夠的能力獲取到新產生的數據。
batchSize
參數決定Source
一次批量運輸到Channel
的Event
條數,適當調大這個參數能夠提升Source
搬運Event
到Channel
時的性能。
Channel
Type
選擇Memory Channel
時Channel
的性能最好,可是若是Flume
進程意外掛掉可能會丟失數據
Type
選擇File Channel
時Channel
的容錯性更好,可是性能上會比Memory Channel
差,使用File Channel
時`dataDirs 配置多個不一樣盤下的目錄能夠提升性能。
Capacity
參數決定Channel
可容納最大的Event
條數,TransactionCapacity
參數決定每次Source
往Channel
裏面寫的最大Event
條數和每次Sink
從Channel
裏面讀的最大Event
條數,TransactionCapacity
須要大於Source
和Sink
的batchSize
參數。
Sink
增長Sink
的個數能夠增長Sink
消費Event
的能力,Sink
也不是越多越好夠用就行,過多的Sink
會佔用系統資源,形成系統資源沒必要要的浪費。
batchSize
參數決定Sink
一次批量從Channel
讀取的Event
條數,適當調大這個參數能夠提升Sink
從Channel
搬出Event
的性能。