Flume 入門

1Flume概述

1.1 定義

FlumeCloudera提供的一個高可用的,高可靠的,分佈式的海量日誌採集、聚合和傳輸的系統;php

Flume基於流式架構,靈活簡單。java

1.2 特色

能夠和任意存儲進程集成linux

輸入的的數據速率大於寫入目的存儲的速率,Flume會進行緩衝,減少HDFS的壓力web

Flume中的事務基於Channel,使用了兩個事務模型(sender+ receiver),確保消息被可靠發送shell

Flume使用兩個獨立的事務分別負責從SoucrceChannel,以及從ChannelSink 的事件傳遞。一旦事務中全部的數據所有成功提交到Channel,那麼Source才認爲該數據讀取完成,同理,只有成功被Sink寫出去的數據,纔會從Channel中移除apache

1.3 組成架構

Flume 入門

1.3.1Agent

Agent是一個JVM進程,它以事件的形式將數據從源頭傳遞到目的地json

Agent主要由SourceChannelSink組成vim

1.3.2Source

Source是負責接收數據到Agent的組件,能夠處理各類類型,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy瀏覽器

1.3.3Channel

Channel是位於SourceSink之間的緩衝區,所以,Channel容許SourceSink運做在不一樣的速率上,Channel是線程安全的,能夠同時處理幾個Source的寫入操做和幾個Sink的讀取操做。安全

Flume自帶兩種Channel

Memory Channel:內存中的隊列速度快,適合在不須要關係數據丟失的情境下使用

File Channel:將全部事件寫入磁盤,所以在程序關閉或機器宕機的狀況下不會丟失數據

1.3.4Sink

Sink不斷地輪詢Channel中的事件且批量地移除它們,並將這些事件批量寫入到存儲或索引系統、或者被髮送到另外一個Flume Agent

Sink是徹底事務性的,在從Channel批量刪除數據以前,每一個SinkChannel啓動一個事務,批量事件一旦成功寫出到存儲系統或下一個Flume AgentSink就利用Channel提交事務,事務一旦被提交,該Channel從本身的內部緩衝區刪除事件。

Sink組件目的地包括hdfsloggeravrothriftipcfilenullHBasesolr、自定義。

1.3.5Event

傳輸單元,Flume數據傳輸的基本單元,以事件的形式將數據從源頭送至目的地。

Event由可選的header和載有數據的一個byte array構成,Header是容納了key-value字符串對的HashMap

一般一條數據就是一個 Event,每2048個字節劃分一個Event

1.4 拓撲結構

Flume 入門

這種模式是將多個Flume給順序鏈接起來了,從最初的Source開始到最終Sink傳送的目的存儲系統,此模式不建議橋接過多的Flume數量, Flume數量過多不只會影響傳輸速率,並且一旦傳輸過程當中某個節點Flume宕機,會影響整個傳輸系統。

Flume 入門

Flum支持將事件流向一個或者多個目的地,這種模式將數據源複製到多個Channel中,每一個Channel都有相同的數據,Sink能夠選擇傳送的不一樣的目的地。

Flume 入門

Flume支持使用將多個Sink邏輯上分到一個Sink組,Flume將數據發送到不一樣的Sink,主要解決負載均衡和故障轉移問題。

Flume 入門

這種模式是咱們最多見的,也很是實用,平常web應用一般分佈在上百個服務器,大者甚至上千個、上萬個服務器,產生的日誌,處理起來也很是麻煩,用Flume的這種組合方式能很好的解決這一問題,每臺服務器部署一個Flume採集日誌,傳送到一個集中收集日誌的Flume,再由此Flume上傳到 hdfshivehbasejms等進行日誌分析。

1.5Agent原理

Flume 入門

2Flume部署

一、解壓apache-flume-1.7.0-bin.tar.gz/opt/module目錄下

二、修改apache-flume-1.7.0-bi的名稱爲flume

三、將flume/conf下的flume-env.sh.template文件修改成flume-env.sh,並配置flume-env.sh中的JAVA_HOME

3 企業開發案例

3.1 監控端口數據

需求分析:

服務端監聽本機44444端口

服務端使用netcat工具向44444端口發送消息

最後將數據展現在控制檯上

實現步驟:

一、在job文件夾下建立Agent配置文件flume-netcat-logger.conf

[djm@hadoop102 job]$ vim flume-netcat-logger.conf

二、添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三、啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 –f job/flume-netcat-logger.conf -Dflume.root.logger==INFO,console

參數說明:

--conf conf/表示配置文件存儲在conf/目錄

--name a1表示給 Agent 起名爲a1

--conf-file job/flume-netcat.conf Flume本次啓動讀取的配置文件是在job文件夾下的 flume-telnet.conf文件

-Dflume.root.logger==INFO,console -D表示Flume運行時動態修改flume.root.logger參數屬性值,並將控制檯日誌打印級別設置爲INFO級別

3.2 實時讀取本地文件到HDFS

需求分析:

實時監控Hive日誌,並上傳到HDFS

實現步驟:

一、在job文件夾下建立Agent配置文件flume-file-hdfs.conf

[djm@hadoop102 job]$ vim flume-file-hdfs.conf

二、添加以下內容:

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時間滾動文件夾
a2.sinks.k2.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k2.hdfs.rollInterval = 60
#設置每一個文件的滾動大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

三、啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 –f job/flume-file-hdfs.conf

注意:

要想讀取Linux系統中的文件,就得按照Linux命令的規則執行命令,因爲Hive日誌在Linux系統中因此讀取文件的類型選擇:execexecute執行的意思。表示執行Linux命令來讀取文件。

3.3 實時讀取目錄文件到 HDFS

需求分析:

使用Flume監聽整個目錄的文件

實現步驟:

一、在job文件夾下建立Agent配置文件flume-dir-hdfs.conf

[djm@hadoop102 job]$ vim flume-dir-hdfs.conf

二、添加以下內容:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略全部以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位建立一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#從新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每一個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

三、啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 –f job/flume-dir-hdfs.conf

注意:

不要在監控目錄中建立並持續修改文件

3.4 單數據源多出口案例(選擇器)

需求分析:

使用Flume-1監控文件變更,Flume-1將變更內容傳遞給Flume-2

Flume-2負責存儲到HDFS

同時Flume-1將變更內容傳遞給Flume-3Flume-3負責輸出到Local FileSystem

一、在group1文件夾下建立Agent配置文件flume-file-flume.conf

[djm@hadoop102 group1]$ vim flume-file-flume.conf

二、添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流複製給全部channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

三、在group1文件夾下建立Agent配置文件flume-flume-hdfs.conf

[djm@hadoop102 group1]$ vim flume-flume-hdfs.conf

四、添加以下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個數據接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位建立一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#從新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每一個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

五、在group1文件夾下建立 Agent 配置文件flume-flume-dir.conf

[djm@hadoop102 group1]$ vim flume-flume-dir.conf

六、添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

七、啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf

注意:

Avro是一種語言無關的數據序列化和RPC框架

輸出的本地目錄必須是已經存在的目錄,若是該目錄不存在,並不會建立新的目錄

必須先啓動Sink存在的job

3.5 單數據源多出口案例(Sink組)

需求分析:

使用Flume-1監控端口數據,Flume-1將變更內容傳遞給Flume-2

Flume-2負責將數據展現在控制檯上

同時Flume-1將變更內容傳遞給Flume-3Flume-3也負責將數據展現在控制檯上

實現步驟:

一、在group2文件夾下建立Agent配置文件flume-netcat-flume.conf

二、添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

三、在group2文件夾下建立Agent配置文件flume-flume-console1.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

五、在 group2文件夾下建立Agent配置文件flume-flume-console2.conf

六、添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

七、啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf

3.6 多數據源彙總

需求分析:

hadoop103上的Flume-1監控文件/opt/module/group.log

hadoop102上的Flume-2監控某一個端口的數據流

Flume-1Flume-2將數據發送給hadoop104上的Flume-3Flume-3將最終數據打印到控制檯

實現步驟:

一、在group3文件夾下建立Agent配置文件flume1-logger-flume.conf

[djm@hadoop102 group3]$ vim flume1-logger-flume.conf

二、添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

三、在group3文件夾下建立Agent配置文件flume2-netcat-flume.conf

[djm@hadoop102 group3]$ vim flume2-netcat-flume.conf

四、添加以下內容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

五、在group3文件夾下建立Agent配置文件flume3-flume-logger.conf

[djm@hadoop102 group3]$ vim flume3-flume-logger.conf

六、添加以下內容:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

七、分發配置文件

[djm@hadoop102 group3]$ xsync /opt/module/flume/job

八、啓動任務

[djm@hadoop104 flume]$ bin/flume-ng agent -c conf/ -n a3 -f job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a2 -f job/group3/flume2-netcat-flume.conf
[djm@hadoop103 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/group3/flume1-logger-flume.conf

4Ganglia部署

一、安裝httpd服務與php

yum -y install httpd php

二、安裝其餘依賴

yum -y install rrdtool perl-rrdtool rrdtool-devel

三、安裝ganglia

rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
yum -y install ganglia-gmetad ganglia-gmond ganglia-web

四、修改ganglia配置文件

vim /etc/httpd/conf.d/ganglia.conf
#
# Ganglia monitoring system php web frontend
#

Alias /ganglia /usr/share/ganglia

<Location /ganglia>
  # Require local
  Require all granted
  # Require ip 10.1.2.3
  # Require host example.org
</Location>

特別注意:如下配置是不能起做用的

<Location /ganglia>
  Order deny,allow
  Allow from all
</Location>

五、修改gmetad配置文件

vim /etc/ganglia/gmetad.conf
data_source "hadoop102" 192.168.1.102

六、修改gmond配置文件

vim /etc/ganglia/gmond.conf
cluster {
  #name = "unspecified"
  name = "hadoop102"
  owner = "unspecified"
  latlong = "unspecified"
  url = "unspecified"
}

udp_send_channel { 
#bind_hostname = yes # Highly recommended, soon to be default. 
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
  #mcast_join = 239.2.11.71
  host = 192.168.10.102
  port = 8649
  ttl = 1
}

/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
  #mcast_join = 239.2.11.71
  port = 8649
  #bind = 239.2.11.71
  bind = 192.168.10.102
  retry_bind = true 

# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}

六、查看SELinux狀態

sestatus

若是不是disabled,需修改如下配置文件:

vim /etc/selinux/config

或者臨時關閉SELinux

setenforce 0

七、啓動ganglia

systemctl start httpd
systemctl start gmetad 
systemctl start gmond

八、打開瀏覽器訪問

http://hadoop102/ganglia/

若是完成以上操做仍出現權限不足錯誤,可修改/var/lib/ganglia目錄的權限嘗試

chmod -R 777 /var/lib/ganglia

5 自定義Source

需求分析:

Flume 入門

編碼實現:

一、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

二、代碼編寫

package com.djm.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    //定義配置文件未來要讀取的字段
    private Long delay;

    private String field;

    /**
     * 接收數據,將數據封裝成一個個event,寫入channel
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        HashMap<String, String> hearderMap  = new HashMap<>();
        SimpleEvent event = new SimpleEvent();
            try {
                for (int i = 0; i < 5; i++) {
                    event.setHeaders(hearderMap);
                    event.setBody((field + i).getBytes());
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
    }

    public long getBackOffSleepIncrement() {
        return 0;
    }

    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    /**
     * 讀取配置文件
     * @param context
     */
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "hello");
    }
}

三、打包測試

利用Maven打包並上傳到 /opt/module/flume/lib目錄下

job文件夾下建立Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysource.conf

添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = com.djm.flume.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = djm

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

6 自定義Sink

需求分析:

Flume 入門

編碼實現:

一、引入依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.7.0</version>
</dependency>

二、代碼編寫

package com.djm.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);

    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;

        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            Event event;
            transaction.begin();
            while ((event = channel.take()) == null) {
                Thread.sleep(200);
            }
            LOG.info(prefix + new String(event.getBody()) + suffix);
            transaction.commit();
            status = Status.READY;
        } catch (Throwable e) {
            transaction.rollback();
            status = Status.BACKOFF;
            if (e instanceof Error)
                throw (Error) e;
        } finally {
            transaction.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        prefix = context.getString("prefix");
        suffix = context.getString("suffix");
    }
}

三、打包測試

利用Maven打包並上傳到 /opt/module/flume/lib目錄下

job文件夾下建立Agent配置文件mysource.conf

[djm@hadoop102 job]$ vim mysink.conf

添加以下內容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.djm.flume.MySink
a1.sinks.k1.prefix = djm:
a1.sinks.k1.suffix = :end

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

啓動任務

[djm@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

7Flume參數調優

7.1Source

增長Source個數能夠增大Source的讀取數據的能力,例如:當某一個目錄產生的文件過多時須要將這個文件目錄拆分紅多個文件目錄,同時配置好多個Source以保證Source有足夠的能力獲取到新產生的數據。

batchSize參數決定Source一次批量運輸到ChannelEvent條數,適當調大這個參數能夠提升Source搬運EventChannel時的性能。

7.2Channel

Type選擇Memory ChannelChannel的性能最好,可是若是Flume進程意外掛掉可能會丟失數據

Type選擇File ChannelChannel的容錯性更好,可是性能上會比Memory Channel差,使用File Channel時`dataDirs 配置多個不一樣盤下的目錄能夠提升性能。

Capacity參數決定Channel可容納最大的Event條數,TransactionCapacity 參數決定每次SourceChannel裏面寫的最大Event條數和每次SinkChannel裏面讀的最大Event條數,TransactionCapacity須要大於SourceSinkbatchSize參數。

7.3Sink

增長Sink的個數能夠增長Sink消費Event的能力,Sink也不是越多越好夠用就行,過多的Sink會佔用系統資源,形成系統資源沒必要要的浪費。

batchSize參數決定Sink一次批量從Channel讀取的Event條數,適當調大這個參數能夠提升SinkChannel搬出Event的性能。

相關文章
相關標籤/搜索