Flume理論研究與實驗

1、理論研究html

 1.1 總覽  node

  Flume是一個分佈式的可靠的日誌收集系統,主要是用於從各類數據源收集、聚合並移動大批量的日誌數據到存儲系統;它自己具備許多故障轉移和恢復機制,具備強大的容錯能力;它使用下面這樣一個簡單的可擴展的數據流模型來進行在線分析。git

 

 1.2 核心概念github

   Event:一個FLume事件是貫通整個Agent的基本的數據單元。Event從Source流到Channel再到Sink組件,主要是經過Event接口實現。一個Event包含一個字節數組(body)和一些可選的字符串表示的屬性(headers)。sql

   Agent:一個Agent就是一個獨立的JVM進程。整個進程能夠簡單的描述爲:Source從外部數據源消費特殊格式的Events,而後將這些事件存儲到Channel中,Channel做爲緩衝區會保存事件直到事件被Sink消費爲止,當Sink消費到Channel中的事件,再將其從Channel中移除並將事件存放到外部的數據倉庫好比hdfs或者下一個數據量的Source中。apache

 1.3 可靠性centos

  單跳可靠性:存儲在Channel中的事件只有當它被存儲在下一個Agent的Channel或者終端倉庫中才會被Sink移除,這是Flume提供的一種end-to-end的數據流可靠性傳輸機制。數組

  多跳可靠性:Flume經過使用這樣一種事務的方法來保證Events的可靠性傳輸,主要是由Channel提供的事務裏,封裝了Source和Sink對Event進行有效的存儲和恢復。上一個Agent中的Sink和當前Agent的Source將開啓它們的事務方法來保證數據安全的存儲在當前Agent的Channel中,這是一種point-to-point的可靠性傳輸機制。緩存

 1.4 可恢復性安全

  緩存在Channel中的事件將會管理數據的恢復。Flume支持持久化的File Channel,當Channel類型爲File Channel時數據將持久化到本地文件系統,固然也有Memory Channel會簡單的將數據存儲在內存隊列中,Momery Channel速度很快可是一旦Agent掛了,數據將不可恢復。

2、實驗

 2.1 環境準備

  1. centos、JDK1.八、maven3;
  2. 從git下載源碼後實驗mvn編譯或者直接cdh界面安裝flume;
  3. 足夠的日誌空間和內存以及數據寫入權限;
  4. 配置好flume的環境變量後,命令行輸入flume-ng version查看版本驗證安裝是否成功。

 2.2 實驗案例

  flume的根目錄下的conf文件夾中的flume-conf.properties.template文件時一個配置文件的模板,該文件對Agent進行了詳細的配置。一個或多個Agent的配置能夠寫在一個配置文件中,配置主要包括Agent中的每個source、sink和Channel的屬性以及這些組件是如何連在一塊兒並組成數據流模型的。在Flume的實驗中,咱們主要經過配置文件來創建Flume模型,並經過bin/flume-ng來啓動Flume的Agent。flume-ng命令須要指定agent的名稱以及配置路徑和配置文件名稱,更多參數配置能夠經過輸入flume-ng help來了解。

 實驗1、本地tcp端口發送消息到flume,而後日誌輸出結果

  首先,編寫配置文件

# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

  以上配置定義了一個叫a1的單節點agent,a1擁有一個從端口44444監聽數據的source,一個將數據緩存到內存的channel以及將日誌數據輸出到日誌窗口的sink組件。固然一個配置文件中能夠包含多個agent的配置,所以在啓動flume-ng的時候須要指定agent的名稱。啓動命令相似於下面這種:

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

  以上命令中,在參數conf中指定配置文件的路徑好比flume目錄下的配置文件夾conf,該路徑下包含flume-env.sh和log4j.propertites(該文件中默認配置了日誌輸出原始數據以及配置數據-Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true),這樣agent能夠在指定的環境下運行而且以日誌格式將數據輸出到界面。

  而後,能夠在經過netcat命令往本地端口44444寫數據,並在flume的終端的日誌信息中找到包含的事件。

 實驗2、oracle增量數據寫入flume,結果寫入到hdfs

  前期準備:從https://github.com/keedio/flume-ng-sql-source獲取源碼使用命令mvn package編譯flume對接sql的源碼成jar包並將jar包放在flume主目錄下的lib文件夾中,注意如下說明中的版本對應。

 

  配置編寫:

# example.conf: A single-node Flume configuration

# Name the components on this a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# For each one of the sources, the type is defined
a2.sources.r2.type = org.keedio.flume.source.SQLSoure
a2.sources.r2.hibernate.connection.url = jdbc:oracle:thin:@ip:port/sid
a2.sources.r2.hibernate.connection.user = username
a2.sources.r2.hibernate.connection.password = passwd
a2.sources.r2.hibernate.connection.autocommit = true
a2.sources.r2.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
a2.sources.r2.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver

# Query delay, each configured milisecond the query will be sent
a2.sources.r2.run.query.delay=10000

# Status file is used to save last readed row
a2.sources.r2.status.file.path = /var/log/flume
a2.sources.r2.status.file.name = r2.status

# Custom query
a2.sources.r2.start.from = 197001010
a2.sources.r2.custom.query = SELECT emplid,effdt from sys.test where emplid>'$@$'

# Standard Query 

# a2.sources.r2.table = sys.test

# a2.sources.r2.columns.to.select = emplid,effdt

a2.sources.r2.batch.size = 1000
a2.sources.r2.max.rows = 1000
a2.sources.r2.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a2.sources.r2.hibernate.c3p0.min_size = 1
a2.sources.r2.hibernate.c3p0.max_size = 10

#sink's type must be defined
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = /flume/oracle
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.rollSize = 1024000000
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollCount = 0

# The channel can be defined as follows.
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 1000

# connected source and sink with channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

 

  注意事項:start.from配置的是查詢的增量字段對應的起始數據(增量字段默認爲查詢語句的第一個列名稱),若是使用custom.query的查詢語句不在where語句中使用'$@$'對增量字段進行過濾,將會致使數據會重複寫入,每隔run.querry.delay的時間間隔將會運行一次會執行一次查詢語句,並將數據讀入到channel中,而後更新status文件中的lastIndex即'$@$',該索引做爲下一次執行查詢時的開始字段的索引發始位置,大於改索引的全部記錄將會輸入到channel,建議直接使用standard Query來給經過columns.to.select配置列,以及table指定查詢的表,這兩個配置項配置了將會在後臺執行SELECT <columns.to.select> FROM <table>,而且從初始化的數據開始自動執行增量查詢。

 實驗3、avro發送文件到flume,結果寫入到rocket mq

  準備:源碼見https://gitee.com/mxb/rocketmq-flume,將rocketmq相關的包和該源碼編譯的包所有打包在一塊兒放進lib文件夾下,具體包以下圖:

 

  配置:

agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1 agent1.sources.source1.type=avro agent1.sources.source1.bind=0.0.0.0 agent1.sources.source1.port=15151 agent1.sources.source1.channels=channel1 agent1.sinks.sink1.type=com.handu.flume.sink.rocketmq.RocketMQSink agent1.sinks.sink1.namesrvAddr=rocketmq_namesrv:9876 agent1.sinks.sink1.producerGroup=MyProducerGroup_1 agent1.sinks.sink1.topic=FromFlume agent1.sinks.sink1.tag=Tag1 agent1.sinks.sink1.channel=channel1 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=100 agent1.channels.channel1.transactionCapacity=100 agent1.channels.channel1.keep-alive=3

 

 

  avro發消息:

$FLUME_HOME/bin/flume-ng avro-client -H localhost -p 15151 -F $FLUME_HOME/README

 

 

  編寫一個mq的consumer的進程接收消息,輸出對應TOPIC下的的消息body中將會包含README的內容。

原文出處:https://www.cnblogs.com/wing1995/p/12053245.html

相關文章
相關標籤/搜索