flume原理及代碼實現

轉載標明出處:http://www.cnblogs.com/adealjason/p/6240122.htmlhtml

 

最近想玩一下流計算,先看了flume的實現原理及源碼web

源碼能夠去apache 官網下載算法

下面整理下flume的原理及代碼實現:apache

flume是一個實時數據收集工具,hadoop的生態圈之一,主要用來在分佈式環境下各服務器節點作數據收集,而後彙總到統一的數據存儲平臺,flume支持多種部署架構模式,單點agent部署,分層架構模式部署,如經過一個負載均衡agent將收集的數據分發到各個子agent,而後在彙總到同一個agent上,數據傳輸到統一的數據存儲平臺,再次很少廢話,flume支持的部署架構圖能夠參見源碼中的doc目錄下的圖片編程

 

 

flume原理:json

目前最新版本是Flume NG,如下基於Flume NG來講:緩存

flume由如下幾個核心概念:服務器

flume event:flume內部的數據單元,包含兩部分,一個頭結點,一個body結點,頭結點是一個Map<String, String>,部署的agent結點能夠經過現有的Interceptor或者自定義Interceptor往消息頭裏放置數據,如ip,hostname等標識消息來源於哪臺服務器,event在flume內部作流轉,是數據傳輸的載體架構

flume source:source是flume的數據來源,flume支持多種數據來源,如taildir監控一個文件的變化,spollDir監控一個文件夾的變化,jmsSource接收jms消息等,最經常使用的avroSource是構成flume分層架構的基礎,source是一個接口,flume提供了多種消息接入方式,在sourceType枚舉類中都有詳細列出,特殊說明下,因爲flume是面向接口編程,其中有一個Other的枚舉,是佔位符,使用者能夠自定義source源,只要求在flume啓動的時候能夠加載到這個類便可(底層是經過反射獲取到class的實例的)負載均衡

flume channel:flume是基於pipeline的模式,channel的存在豐富了flume的數據傳播途徑,channel能夠再source和sink之間作緩衝,動態調節數據的收集及發送(內部有一個xxxCounter會沒接收到一個event或者發送一個event都會作記錄),緩衝source和sink之間的壓力,其二channel能夠關聯多個source,如一個source能夠按照配置選擇的將數據複製到各個管道,或者按照消息頭自動分發到指定的管道,一個channel能夠接多個sink,這個實現了同一份數據的多發發送池,實現了數據的複用及負載均衡等功能,channel內部流轉的數據載體是event,flume channel支持多種數據緩衝實現方式,如fileChannel:用一個文件作數據緩存、memoryChannel:使用內存緩存,底層實現是一個LinkedBlockingDeque,一個雙向阻塞列表,具體可參見ChannelType

flume sink:flume的數據發送池,主要負責數據的發送,從channel接收到event,而後發送到指定的數據接收方,flume提供多種sink實現,具體可參見SinkType,經常使用的有:loggerSink:這個主要用於flume的部署調試,它會將接收到的event事件直接用log4j輸出出來,RollingFileSink:這個sink主要是將接收到的日誌文件序列化到一個文件目錄中,因此須要配置文件的地址,切分文件的頻率等,avroSink:這個是flume分層架構中最經常使用的sink,通常和avroSource配對使用,avro是apache的一個子項目,用於數據的序列化,使用avroSource及avroSink時,須要在avroSource的agent節點服務器上監聽一個端口,avroSink的agent把接收到的數據發送到該ip、port上即完成了flume的分層部署,avro僅是一個數據序列化工具,底層實現由一個RpcClient的東東來將數據在這source和sink之間傳輸(能夠留一下啓動日誌,會自動建立一個RpcClient),固然,flume的編碼是按照面向接口來的,因此和source同樣支持自定義的sink

上述是幾個核心的概念,正式因爲flume的這種設計思想及編碼風格,讓flume有很強的拓展性

 

固然僅僅有這幾個仍是不能夠徹底讓flume運行起來的,flume提供了不少輔助類用於驅動、分發內部event及整個flume系統的運轉,基本以下:

配置領域:

AgentConfiguration:這個看名字就知道是flume的配置元素領域內的東西,是的,使用者在flume-conf.properties中配置的數據解析成AgentConfiguration,是配置文件到面向對象的一個抽象

AbstractConfigurationProvider:該類看名字就是一個抽象的配置Provider類,內部有一個很重要的方法就是:getConfiguration(),該方法中經過以下幾個private方法來加載flume的channel、source、sink、sinkGroups並將它們關聯起來

        loadChannels(agentConf, channelComponentMap);

        loadSources(agentConf, channelComponentMap, sourceRunnerMap);

        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

flume還支持動態加載,PollingPropertiesFileConfigurationProvider(AbstractConfigurationProvider的一個具體實現)在flume啓動的時候會啓動一個線程FileWatcherRunnable,監控flume的配置文件變化,配置文件內部加載用的是google的EventBus來驅動的

驅動領域:

flume的source有以下兩個子接口:PollableSource和EventDrivenSource,前者須要本身去輪循的訪問數據源,當前是否能夠加載到數據,若是有則加載進來轉換成flume的event,實現類有taildir、spollDir、jsmSource、kafkaSource等,該接口新增了一個process方法用於輪循調用,後者是一個事件驅動的Source,該接口不須要主動去訪問數據源,僅須要接收數據推進過來的event並轉換成flume的event便可,實現類有:scribeSource(該數據源用來打通Facebook的scribe數據收集工具)、AvroSource等

SourceRunner:

因爲這兩個source的存在,因此因此flume提供了兩個sourceRunner來驅動source的運行,分別是PollableSourceRunner和EventDrivenSourceRunner,前者啓動時自動啓動一個PollingRunner線程用於定時輪循process方法

channelProcessor:

該類用於source到channel之間的數據發送,實現了一個source能夠關聯到多個channel,簡單點如這2個接口,source的定義:setChannelProcessor(ChannelProcessor channelProcessor)指定一個ChannelProcessor ,ChannelProcessor 關聯到一個final的ChannelSelector,selector關聯到Channel:setChannels(List<Channel> channels)

ChannelProcessor:

關聯到指定的ChannelSelector,ChannelSelector提供了兩種selector方式,ReplicatingChannelSelector:將source的event複製到各個channel中,MultiplexingChannelSelector:根據頭結點的header信息自動路由到對應的Channel中

Transaction及BasicTransactionSemantics

flume的Channel內部保證一個event的發送在一個事務完成,若是發送失敗或者接收失敗則回滾,當成功時才從channel中刪除掉該event

SinkProcessor:

用過選擇要發送的sink,什麼意思呢?該類有兩個實現:

LoadBalancingSinkProcessor:

負載均衡方式:提供了roud_bin算法和random算法、以及固定order算法的實現方式,將Channel中的event發送到多個sink上

FailoverSinkProcessor:

能夠實現實現failover功能,具體流程相似LoadBalancingSinkProcessor,區別是FailoverSinkProcessor維護了一個PriorityQueue,用來根據權重選擇sink

SinkRunner:

該類用於驅動一個sink,啓動是內部開了一個線程PollingRunner,定時的調用SinkProcessor

上述是全部的核心概念及代碼做用,下面描述下flume的運行流程:

1.系統啓動時經過配置領域能夠按照客戶定義的配置加載一個flume

2.SourceRunner和SinkProcessor同時啓動,一個往Channel中生產event,一個從Channel中消費event,內部是一個生產者消費者模式

3.經過一些輔助類,實現Channel到source及sink的多路分發及分層架構

 

下面是一個本身搭建的flume配置文件,供參考:

實現流程:

負載均衡+分發+落地到日誌文件

1.負載均衡節點:

從兩個文件源讀數據,在event頭裏增長數據來源標識,複製到兩個channel中,一個log打印,一個作負載均衡分發到另外兩臺機器的agent上,負載均衡算法採用roud_robin

loadBalancAgent.sources = taildirSrc

loadBalancAgent.channels = memoryChannel fileChannel

loadBalancAgent.sinks = loggerSink1 loggerSink2 loggerSink3

loadBalancAgent.sinkgroups = loadBalanceGroups

## taildirSrc config

loadBalancAgent.sources.taildirSrc.type = TAILDIR

loadBalancAgent.sources.taildirSrc.positionFile = /alidata1/admin/openSystem/flumetest/log/taildir_position.json

loadBalancAgent.sources.taildirSrc.filegroups = f1 f2

loadBalancAgent.sources.taildirSrc.filegroups.f1 = /alidata1/admin/dts-server-web/dts-server.log

loadBalancAgent.sources.taildirSrc.headers.f1.headerKey1 = dts-server-log

loadBalancAgent.sources.taildirSrc.filegroups.f2 = /alidata1/admin/flume/test.log

loadBalancAgent.sources.taildirSrc.headers.f2.headerKey1 = flume-test-log

loadBalancAgent.sources.taildirSrc.fileHeader = true

## replicating channel config

loadBalancAgent.sources.taildirSrc.selector.type = replicating

loadBalancAgent.sources.taildirSrc.channels = memoryChannel fileChannel

loadBalancAgent.sources.taildirSrc.selector.optional = fileChannel

## memory chanel config

loadBalancAgent.channels.memoryChannel.type = memory

loadBalancAgent.channels.memoryChannel.capacity = 10000

loadBalancAgent.channels.memoryChannel.transactionCapacity = 10000

loadBalancAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

loadBalancAgent.channels.memoryChannel.byteCapacity = 800000

## file channel config

loadBalancAgent.channels.fileChannel.type = file

loadBalancAgent.channels.fileChannel.checkpointDir = /alidata1/admin/openSystem/flumetest/log

loadBalancAgent.channels.fileChannel.dataDirs = /alidata1/admin/openSystem/flumetest/data

## loadbalance sink processor

loadBalancAgent.sinkgroups.loadBalanceGroups.sinks = loggerSink1 loggerSink2

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.type = load_balance

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.backoff = true

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.selector = round_robin

## loggerSink1 config

loadBalancAgent.sinks.loggerSink1.type = avro

loadBalancAgent.sinks.loggerSink1.channel = memoryChannel

loadBalancAgent.sinks.loggerSink1.hostname = 10.253.42.162

loadBalancAgent.sinks.loggerSink1.port = 4141

## loggerSink2 config

loadBalancAgent.sinks.loggerSink2.type = avro

loadBalancAgent.sinks.loggerSink2.channel = memoryChannel

loadBalancAgent.sinks.loggerSink2.hostname = 10.139.53.6

loadBalancAgent.sinks.loggerSink2.port = 4141

## loggerSink3 config

loadBalancAgent.sinks.loggerSink3.type = file_roll

loadBalancAgent.sinks.loggerSink3.channel = fileChannel

loadBalancAgent.sinks.loggerSink3.sink.rollInterval = 0

loadBalancAgent.sinks.loggerSink3.sink.directory = /alidata1/admin/openSystem/flumetest/dtsServerLog

2.負載均衡節點1

接收avroSink並落地到文件中

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel

3.負載均衡節點2

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel

相關文章
相關標籤/搜索