1、理論研究html
1.1 總覽 node
Flume是一個分佈式的可靠的日誌收集系統,主要是用於從各類數據源收集、聚合並移動大批量的日誌數據到存儲系統;它自己具備許多故障轉移和恢復機制,具備強大的容錯能力;它使用下面這樣一個簡單的可擴展的數據流模型來進行在線分析。git
1.2 核心概念github
Event:一個FLume事件是貫通整個Agent的基本的數據單元。Event從Source流到Channel再到Sink組件,主要是經過Event接口實現。一個Event包含一個字節數組(body)和一些可選的字符串表示的屬性(headers)。sql
Agent:一個Agent就是一個獨立的JVM進程。整個進程能夠簡單的描述爲:Source從外部數據源消費特殊格式的Events,而後將這些事件存儲到Channel中,Channel做爲緩衝區會保存事件直到事件被Sink消費爲止,當Sink消費到Channel中的事件,再將其從Channel中移除並將事件存放到外部的數據倉庫好比hdfs或者下一個數據量的Source中。apache
1.3 可靠性centos
單跳可靠性:存儲在Channel中的事件只有當它被存儲在下一個Agent的Channel或者終端倉庫中才會被Sink移除,這是Flume提供的一種end-to-end的數據流可靠性傳輸機制。數組
多跳可靠性:Flume經過使用這樣一種事務的方法來保證Events的可靠性傳輸,主要是由Channel提供的事務裏,封裝了Source和Sink對Event進行有效的存儲和恢復。上一個Agent中的Sink和當前Agent的Source將開啓它們的事務方法來保證數據安全的存儲在當前Agent的Channel中,這是一種point-to-point的可靠性傳輸機制。緩存
1.4 可恢復性安全
緩存在Channel中的事件將會管理數據的恢復。Flume支持持久化的File Channel,當Channel類型爲File Channel時數據將持久化到本地文件系統,固然也有Memory Channel會簡單的將數據存儲在內存隊列中,Momery Channel速度很快可是一旦Agent掛了,數據將不可恢復。
2、實驗
2.1 環境準備
2.2 實驗案例
flume的根目錄下的conf文件夾中的flume-conf.properties.template文件時一個配置文件的模板,該文件對Agent進行了詳細的配置。一個或多個Agent的配置能夠寫在一個配置文件中,配置主要包括Agent中的每個source、sink和Channel的屬性以及這些組件是如何連在一塊兒並組成數據流模型的。在Flume的實驗中,咱們主要經過配置文件來創建Flume模型,並經過bin/flume-ng來啓動Flume的Agent。flume-ng命令須要指定agent的名稱以及配置路徑和配置文件名稱,更多參數配置能夠經過輸入flume-ng help來了解。
實驗1、本地tcp端口發送消息到flume,而後日誌輸出結果
首先,編寫配置文件
# example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
以上配置定義了一個叫a1的單節點agent,a1擁有一個從端口44444監聽數據的source,一個將數據緩存到內存的channel以及將日誌數據輸出到日誌窗口的sink組件。固然一個配置文件中能夠包含多個agent的配置,所以在啓動flume-ng的時候須要指定agent的名稱。啓動命令相似於下面這種:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
以上命令中,在參數conf中指定配置文件的路徑好比flume目錄下的配置文件夾conf,該路徑下包含flume-env.sh和log4j.propertites(該文件中默認配置了日誌輸出原始數據以及配置數據:-Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true),這樣agent能夠在指定的環境下運行而且以日誌格式將數據輸出到界面。
而後,能夠在經過netcat命令往本地端口44444寫數據,並在flume的終端的日誌信息中找到包含的事件。
實驗2、oracle增量數據寫入flume,結果寫入到hdfs
前期準備:從https://github.com/keedio/flume-ng-sql-source獲取源碼使用命令mvn package
編譯flume對接sql的源碼成jar包並將jar包放在flume主目錄下的lib文件夾中,注意如下說明中的版本對應。
配置編寫:
# example.conf: A single-node Flume configuration
# Name the components on this a2
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# For each one of the sources, the type is defined
a2.sources.r2.type = org.keedio.flume.source.SQLSoure
a2.sources.r2.hibernate.connection.url = jdbc:oracle:thin:@ip:port/sid
a2.sources.r2.hibernate.connection.user = username
a2.sources.r2.hibernate.connection.password = passwd
a2.sources.r2.hibernate.connection.autocommit = true
a2.sources.r2.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
a2.sources.r2.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
# Query delay, each configured milisecond the query will be sent
a2.sources.r2.run.query.delay=10000
# Status file is used to save last readed row
a2.sources.r2.status.file.path = /var/log/flume
a2.sources.r2.status.file.name = r2.status
# Custom query
a2.sources.r2.start.from = 197001010
a2.sources.r2.custom.query = SELECT emplid,effdt from sys.test where emplid>'$@$'
# Standard Query
# a2.sources.r2.table = sys.test
# a2.sources.r2.columns.to.select = emplid,effdt
a2.sources.r2.batch.size = 1000
a2.sources.r2.max.rows = 1000
a2.sources.r2.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a2.sources.r2.hibernate.c3p0.min_size = 1
a2.sources.r2.hibernate.c3p0.max_size = 10
#sink's type must be defined
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = /flume/oracle
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.rollSize = 1024000000
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollCount = 0
# The channel can be defined as follows.
a2.channels.c2.type = memory
a2.channels.c2.capacity = 10000
a2.channels.c2.transactionCapacity = 1000
# connected source and sink with channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
注意事項:start.from配置的是查詢的增量字段對應的起始數據(增量字段默認爲查詢語句的第一個列名稱),若是使用custom.query的查詢語句不在where語句中使用'$@$'對增量字段進行過濾,將會致使數據會重複寫入,每隔run.querry.delay的時間間隔將會運行一次會執行一次查詢語句,並將數據讀入到channel中,而後更新status文件中的lastIndex即'$@$',該索引做爲下一次執行查詢時的開始字段的索引發始位置,大於改索引的全部記錄將會輸入到channel,建議直接使用standard Query來給經過columns.to.select配置列,以及table指定查詢的表,這兩個配置項配置了將會在後臺執行SELECT <columns.to.select> FROM <table>,而且從初始化的數據開始自動執行增量查詢。
實驗3、avro發送文件到flume,結果寫入到rocket mq
準備:源碼見https://gitee.com/mxb/rocketmq-flume,將rocketmq相關的包和該源碼編譯的包所有打包在一塊兒放進lib文件夾下,具體包以下圖:
配置:
agent1.sources=source1 agent1.channels=channel1 agent1.sinks=sink1 agent1.sources.source1.type=avro agent1.sources.source1.bind=0.0.0.0 agent1.sources.source1.port=15151 agent1.sources.source1.channels=channel1 agent1.sinks.sink1.type=com.handu.flume.sink.rocketmq.RocketMQSink agent1.sinks.sink1.namesrvAddr=rocketmq_namesrv:9876 agent1.sinks.sink1.producerGroup=MyProducerGroup_1 agent1.sinks.sink1.topic=FromFlume agent1.sinks.sink1.tag=Tag1 agent1.sinks.sink1.channel=channel1 agent1.channels.channel1.type=memory agent1.channels.channel1.capacity=100 agent1.channels.channel1.transactionCapacity=100 agent1.channels.channel1.keep-alive=3
avro發消息:
$FLUME_HOME/bin/flume-ng avro-client -H localhost -p 15151 -F $FLUME_HOME/README
編寫一個mq的consumer的進程接收消息,輸出對應TOPIC下的的消息body中將會包含README的內容。
原文出處:https://www.cnblogs.com/wing1995/p/12053245.html