Flume-nghtml
Flume是一個分佈式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。java
Flume的文檔能夠看http://flume.apache.org/FlumeUserGuide.html 官方的英文文檔 介紹的比較全面。node
不過這裏寫寫本身的看法git
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
這個是flume的架構圖github
從上圖能夠看到幾個名詞:web
Agent: 一個Agent包含Source、Channel、Sink和其餘的組件。Flume就是一個或多個Agent構成的。apache
Source:數據源。簡單的說就是agent獲取數據的入口 。緩存
Channel:管道。數據流通和存儲的通道。一個source必須至少和一個channel關聯。session
Sink:用來接收channel傳輸的數據並將之傳送到指定的地方。傳送成功後數據從channel中刪除。架構
Flume具備高可擴展性 可隨意組合:
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
注意 source是接收源 sink是發送源
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
上圖是一個source將數據發給3個channel 其中的sink2將數據發給JMS ,sink3將數據發給另外一個source。
總的來講flume的擴展性很是高 根據須要可隨意組合。
如今在說說一個概念叫Event:
Event是flume的數據傳輸的基本單元。Flume本質上是將數據做爲一個event從源頭傳到結尾。是由可選的Headers和載有數據的一個byte array構成。
代碼結構:
- /**
- * Basic representation of a data object inFlume.
- * Provides access to data as it flows throughthe system.
- */
- public interface Event{
- /**
- * Returns a map of name-valuepairs describing the data stored in the body.
- */
- public Map<String, String> getHeaders();
- /**
- * Set the event headers
- * @param headersMap of headers to replace the current headers.
- */
- public void setHeaders(Map<String, String> headers);
- /**
- * Returns the raw byte array of the datacontained in this event.
- */
- public byte[] getBody();
- /**
- * Sets the raw byte array of the datacontained in this event.
- * @param body Thedata.
- */
- public void setBody(byte[] body);
- }
這個是網上找的flume channel ,source,sink的彙總
連接是http://abloz.com/2013/02/26/flume-channel-source-sink-summary.html
Component |
Type |
Description |
Implementation Class |
Channel |
memory |
In-memory, fast, non-durable event transport |
MemoryChannel |
Channel |
file |
A channel for reading, writing, mapping, and manipulating a file |
FileChannel |
Channel |
jdbc |
JDBC-based, durable event transport (Derby-based) |
JDBCChannel |
Channel |
recoverablememory |
A durable channel implementation that uses the local file system for its storage |
RecoverableMemoryChannel |
Channel |
org.apache.flume.channel.PseudoTxnMemoryChannel |
Mainly for testing purposes. Not meant for production use. |
PseudoTxnMemoryChannel |
Channel |
(custom type as FQCN) |
Your own Channel impl. |
(custom FQCN) |
Source |
avro |
Avro Netty RPC event source |
AvroSource |
Source |
exec |
Execute a long-lived Unix process and read from stdout |
ExecSource |
Source |
netcat |
Netcat style TCP event source |
NetcatSource |
Source |
seq |
Monotonically incrementing sequence generator event source |
SequenceGeneratorSource |
Source |
org.apache.flume.source.StressSource |
Mainly for testing purposes. Not meant for production use. Serves as a continuous source of events where each event has the same payload. The payload consists of some number of bytes (specified bysize property, defaults to 500) where each byte has the signed value Byte.MAX_VALUE (0x7F, or 127). |
org.apache.flume.source.StressSource |
Source |
syslogtcp |
SyslogTcpSource |
Source |
syslogudp |
SyslogUDPSource |
Source |
org.apache.flume.source.avroLegacy.AvroLegacySource |
AvroLegacySource |
Source |
org.apache.flume.source.thriftLegacy.ThriftLegacySource |
ThriftLegacySource |
Source |
org.apache.flume.source.scribe.ScribeSource |
ScribeSource |
Source |
(custom type as FQCN) |
Your own Source impl. |
(custom FQCN) |
Sink |
hdfs |
Writes all events received to HDFS (with support for rolling, bucketing, HDFS-200 append, and more) |
HDFSEventSink |
Sink |
org.apache.flume.sink.hbase.HBaseSink |
A simple sink that reads events from a channel and writes them to HBase. |
org.apache.flume.sink.hbase.HBaseSink |
Sink |
org.apache.flume.sink.hbase.AsyncHBaseSink |
org.apache.flume.sink.hbase.AsyncHBaseSink |
Sink |
logger |
Log events at INFO level via configured logging subsystem (log4j by default) |
LoggerSink |
Sink |
avro |
Sink that invokes a pre-defined Avro protocol method for all events it receives (when paired with an avro source, forms tiered collection) |
AvroSink |
Sink |
file_roll |
RollingFileSink |
Sink |
irc |
IRCSink |
Sink |
null |
/dev/null for Flume – blackhole all events received |
NullSink |
Sink |
(custom type as FQCN) |
Your own Sink impl. |
(custom FQCN) |
ChannelSelector |
replicating |
ReplicatingChannelSelector |
ChannelSelector |
multiplexing |
MultiplexingChannelSelector |
ChannelSelector |
(custom type) |
Your own ChannelSelector impl. |
(custom FQCN) |
SinkProcessor |
default |
DefaultSinkProcessor |
SinkProcessor |
failover |
FailoverSinkProcessor |
SinkProcessor |
load_balance |
Provides the ability to load-balance flow over multiple sinks. |
LoadBalancingSinkProcessor |
SinkProcessor |
(custom type as FQCN) |
Your own SinkProcessor impl. |
(custom FQCN) |
Interceptor$Builder |
host |
HostInterceptor$Builder |
Interceptor$Builder |
timestamp |
TimestampInterceptor |
TimestampInterceptor$Builder |
Interceptor$Builder |
static |
StaticInterceptor$Builder |
Interceptor$Builder |
regex_filter |
RegexFilteringInterceptor$Builder |
Interceptor$Builder |
(custom type as FQCN) |
Your own Interceptor$Builder impl. |
(custom FQCN) |
EventSerializer$Builder |
text |
BodyTextEventSerializer$Builder |
EventSerializer$Builder |
avro_event |
FlumeEventAvroEventSerializer$Builder |
EventSerializer |
org.apache.flume.sink.hbase.SimpleHbaseEventSerializer |
SimpleHbaseEventSerializer |
EventSerializer |
org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer |
SimpleAsyncHbaseEventSerializer |
EventSerializer |
org.apache.flume.sink.hbase.RegexHbaseEventSerializer |
RegexHbaseEventSerializer |
HbaseEventSerializer |
Custom implementation of serializer for HBaseSink. (custom type as FQCN) |
Your own HbaseEventSerializer impl. |
(custom FQCN) |
AsyncHbaseEventSerializer |
Custom implementation of serializer for AsyncHbase sink. (custom type as FQCN) |
Your own AsyncHbaseEventSerializer impl. |
(custom FQCN) |
EventSerializer$Builder |
Custom implementation of serializer for all sinks except for HBaseSink and AsyncHBaseSink. (custom type as FQCN) |
Your own EventSerializer$Builder impl. |
|
下面介紹下kafka以及kafka和flume的整合
Kafka:
從這個連接抄了些內容下來http://dongxicheng.org/search-engine/kafka/
Kafka是Linkedin於2010年12月份開源的消息系統,它主要用於處理活躍的流式數據。活躍的流式數據在web網站應用中很是常見,這些數據包括網站的pv、用戶訪問了什麼內容,搜索了什麼內容等。 這些數據一般以日誌的形式記錄下來,而後每隔一段時間進行一次統計處理。
傳統的日誌分析系統提供了一種離線處理日誌信息的可擴展方案,但若要進行實時處理,一般會有較大延遲。而現有的消(隊列)系統可以很好的處理實時或者近似實時的應用,但未處理的數據一般不會寫到磁盤上,這對於Hadoop之類(一小時或者一天只處理一部分數據)的離線應用而言,可能存在問題。Kafka正是爲了解決以上問題而設計的,它可以很好地離線和在線應用。
二、 設計目標
(1)數據在磁盤上存取代價爲O(1)。通常數據在磁盤上是使用BTree存儲的,存取代價爲O(lgn)。
(2)高吞吐率。即便在普通的節點上每秒鐘也能處理成百上千的message。
(3)顯式分佈式,即全部的producer、broker和consumer都會有多個,均爲分佈式的。
(4)支持數據並行加載到Hadoop中。
三、 KafKa部署結構
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
kafka是顯式分佈式架構,producer、broker(Kafka)和consumer均可以有多個。Kafka的做用相似於緩存,即活躍的數據和離線處理系統之間的緩存。幾個基本概念:
(1)message(消息)是通訊的基本單位,每一個producer能夠向一個topic(主題)發佈一些消息。若是consumer訂閱了這個主題,那麼新發布的消息就會廣播給這些consumer。
(2)Kafka是顯式分佈式的,多個producer、consumer和broker能夠運行在一個大的集羣上,做爲一個邏輯總體對外提供服務。對於consumer,多個consumer能夠組成一個group,這個message只能傳輸給某個group中的某一個consumer.
數據從producer推送到broker,接着consumer在從broker上拉取數據。Zookeeper是一個分佈式服務框架 用來解決分佈式應用中的數據管理問題等。
在kafka中 有幾個重要概念producer生產者 consumer 消費者 topic 主題。
咱們來實際開發一個簡單的生產者消費者的例子。
生產者:
- public classProducerTest {
- public static void main(String[] args) {
- Properties props = newProperties();
- props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- props.put("request.required.acks","1");
- ProducerConfigconfig = new ProducerConfig(props);
- Producer<String, String> producer = newProducer<String, String>(config);
- KeyedMessage<String, String> data = newKeyedMessage<String, String>("kafka","test-kafka");
- try {
- producer.send(data);
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.close();
- }
- }
上面的代碼中的xx.xx.xx.xx是kafka server的地址.
上面代碼的意思就是向主題 kafka中同步(不配置的話 默認是同步發射)發送了一個信息 是test-kafka.
下面來看看消費者:
- public classConsumerTest extends Thread {
- private finalConsumerConnector consumer;
- private final String topic;
- public static voidmain(String[] args) {
- ConsumerTest consumerThread = newConsumerTest("kafka");
- consumerThread.start();
- }
- publicConsumerTest(String topic) {
- consumer =kafka.consumer.Consumer
- .createJavaConsumerConnector(createConsumerConfig());
- this.topic =topic;
- }
- private staticConsumerConfig createConsumerConfig() {
- Properties props = newProperties();
- props.put("zookeeper.connect","xx.xx.xx.xx:2181");
- props.put("group.id", "0");
- props.put("zookeeper.session.timeout.ms","10000");
- // props.put("zookeeper.sync.time.ms", "200");
- // props.put("auto.commit.interval.ms", "1000");
- return newConsumerConfig(props);
- }
- public void run(){
- Map<String,Integer> topickMap = new HashMap<String, Integer>();
- topickMap.put(topic, 1);
- Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
- KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
- ConsumerIterator<byte[],byte[]> it =stream.iterator();
- System.out.println("--------------------------");
- while(it.hasNext()){
- //
- System.out.println("(consumer)--> " +new String(it.next().message()));
- }
- }
- }
上面的代碼就是負責接收生產者發送過來的消息 測試的時候先開啓消費者 而後再運行生產者便可看到效果。
接下來 咱們將flume 和kafka進行整合:
在flume的source數據源接收到數據後 經過管道 到達sink,咱們須要寫一個kafkaSink 來將sink從channel接收的數據做爲kafka的生產者 將數據 發送給消費者。
具體代碼:
- public class KafkaSink extends AbstractSinkimplementsConfigurable {
- private static final Log logger = LogFactory.getLog(KafkaSink.class);
- private Stringtopic;
- private Producer<String, String>producer;
- @Override
- public Status process()throwsEventDeliveryException {
- Channel channel =getChannel();
- Transaction tx =channel.getTransaction();
- try {
- tx.begin();
- Event e = channel.take();
- if(e ==null) {
- tx.rollback();
- return Status.BACKOFF;
- }
- KeyedMessage<String,String> data = new KeyedMessage<String, String>(topic,newString(e.getBody()));
- producer.send(data);
- logger.info("Message: {}"+new String( e.getBody()));
- tx.commit();
- return Status.READY;
- } catch(Exceptione) {
- logger.error("KafkaSinkException:{}",e);
- tx.rollback();
- return Status.BACKOFF;
- } finally {
- tx.close();
- }
- }
- @Override
- public void configure(Context context) {
- topic = "kafka";
- Properties props = newProperties();
- props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- // props.setProperty("producer.type", "async");
- // props.setProperty("batch.num.messages", "1");
- props.put("request.required.acks","1");
- ProducerConfigconfig = new ProducerConfig(props);
- producer = newProducer<String, String>(config);
- }
- }
將此文件打成jar包 傳到flume的lib下面 若是你也用的是maven的話 須要用到assembly 將依賴的jar包一塊兒打包進去。
在flume的配置是以下:
- agent1.sources = source1
- agent1.sinks = sink1
- agent1.channels =channel1
- # Describe/configuresource1
- agent1.sources.source1.type= avro
- agent1.sources.source1.bind= localhost
- agent1.sources.source1.port= 44444
- # Describe sink1
- agent1.sinks.sink1.type= xx.xx.xx.KafkaSink(這是類的路徑地址)
- # Use a channel whichbuffers events in memory
- agent1.channels.channel1.type= memory
- agent1.channels.channel1.capacity= 1000
- agent1.channels.channel1.transactionCapactiy= 100
- # Bind the source andsink to the channel
- agent1.sources.source1.channels= channel1
- agent1.sinks.sink1.channel= channel1
測試的話是avro的方式傳送數據的 能夠這樣測試
bin/flume-ng avro-client--conf conf -H localhost -p 44444 -F/data/flumetmp/a
/data/flumetmp/a 這個爲文件的地址.
測試的時候在本地 必定要把上面寫的消費者程序打開 以便接收數據測試是否成功。
接下來咱們介紹下storm而後將kafka的消費者和storm進行整合:
Storm:
Storm是一個分佈式的實時消息處理系統。
Storm各個組件之間的關係:
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
Storm集羣主要由一個主節點和一羣工做節點(worker node)組成,經過 Zookeeper進行協調。
主節點:主節點一般運行一個後臺程序 —— Nimbus,用於響應分佈在集羣中的節點,分配任務和監測故障。
工做節點: Supervisor,負責接受nimbus分配的任務,啓動和中止屬於本身管理的worker進程。Nimbus和Supervisor之間的協調由zookeeper完成。
Worker:處理邏輯的進程,在其中運行着多個Task,每一個task 是一組spout/blots的組合。
Topology:是storm的實時應用程序,從啓動開始一直運行,只要有tuple過來 就會觸發執行。拓撲:storm的消息流動很像一個拓撲結構。
2. stream是storm的核心概念,一個stream是一個持續的tuple序列,這些tuple被以分佈式並行的方式建立和處理。
3. spouts是一個stream的源頭,spouts負責從外部系統讀取數據,並組裝成tuple發射出去,tuple被髮射後就開始再topology中傳播。
4. bolt是storm中處理 數據的核心,storm中全部的數據處理都是在bolt中完成的
這裏就簡單介紹一些概念 具體的能夠看些詳細的教程。
咱們接下來開始整合storm和kafka。
從上面的介紹得知storm的spout是負責從外部讀取數據的 因此咱們須要開發一個KafkaSpout 來做爲kafka的消費者和storm的數據接收源。能夠看看這個https://github.com/HolmesNL/kafka-spout。我在下面只寫一個簡單的可供測試。
具體代碼:
- public class KafkaSpout implements IRichSpout {
- private static final Log logger = LogFactory.getLog(KafkaSpout.class);
- /**
- *
- */
- private static final long serialVersionUID = -5569857211173547938L;
- SpoutOutputCollector collector;
- private ConsumerConnectorconsumer;
- private Stringtopic;
- public KafkaSpout(String topic) {
- this.topic = topic;
- }
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- }
- private static ConsumerConfig createConsumerConfig() {
- Properties props = newProperties();
- props.put("zookeeper.connect","xx.xx.xx.xx:2181");
- props.put("group.id","0");
- props.put("zookeeper.session.timeout.ms","10000");
- //props.put("zookeeper.sync.time.ms", "200");
- //props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
- @Override
- public void close() {
- // TODOAuto-generated method stub
- }
- @Override
- public void activate() {
- this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
- Map<String, Integer> topickMap = newHashMap<String, Integer>();
- topickMap.put(topic,new Integer(1));
- Map<String, List<KafkaStream<byte[],byte[]>>>streamMap =consumer.createMessageStreams(topickMap);
- KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
- ConsumerIterator<byte[],byte[]> it =stream.iterator();
- while (it.hasNext()) {
- String value = newString(it.next().message());
- System.out.println("(consumer)-->" + value);
- collector.emit(new Values(value), value);
- }
- }
- @Override
- public void deactivate() {
- // TODOAuto-generated method stub
- }
- private boolean isComplete;
- @Override
- public void nextTuple() {
- }
- @Override
- public void ack(Object msgId) {
- // TODOAuto-generated method stub
- }
- @Override
- public void fail(Object msgId) {
- // TODOAuto-generated method stub
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("KafkaSpout"));
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- // TODOAuto-generated method stub
- return null;
- }
- }
- public class FileBlots implementsIRichBolt{
- OutputCollector collector;
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- }
- public void execute(Tuple input) {
- String line = input.getString(0);
- for(String str : line.split("\\s+")){
- List a = newArrayList();
- a.add(input);
- this.collector.emit(a,newValues(str));
- }
- this.collector.ack(input);
- }
- public void cleanup() {
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("words"));
- }
- public Map<String, Object> getComponentConfiguration() {
- // TODOAuto-generated method stub
- return null;
- }
- }
- public class WordsCounterBlots implementsIRichBolt{
- OutputCollector collector;
- Map<String, Integer> counter;
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- this.counter =new HashMap<String, Integer>();
- }
- public void execute(Tuple input) {
- String word = input.getString(0);
- Integer integer = this.counter.get(word);
- if(integer !=null){
- integer +=1;
- this.counter.put(word, integer);
- }else{
- this.counter.put(word, 1);
- }
- System.out.println("execute");
- Jedis jedis = JedisUtils.getJedis();
- jedis.incrBy(word, 1);
- System.out.println("=============================================");
- this.collector.ack(input);
- }
- public void cleanup() {
- for(Entry<String, Integer> entry :this.counter.entrySet()){
- System.out.println("------:"+entry.getKey()+"=="+entry.getValue());
- }
- }
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- public Map<String, Object> getComponentConfiguration() {
- // TODOAuto-generated method stub
- return null;
- }
- }
Topology測試:
- public class KafkaTopology {
- public static void main(String[] args) {
- try {
- JedisUtils.initialPool("xx.xx.xx.xx", 6379);
- } catch (Exception e) {
- e.printStackTrace();
- }
- TopologyBuilder builder = newTopologyBuilder(); builder.setSpout("kafka",new KafkaSpout("kafka"));
- builder.setBolt("file-blots",new FileBlots()).shuffleGrouping("kafka");
- builder.setBolt("words-counter",new WordsCounterBlots(),2).fieldsGrouping("file-blots",new Fields("words"));
- Config config = new Config();
- config.setDebug(true);
- LocalCluster local = newLocalCluster();
- local.submitTopology("counter", config, builder.createTopology());
- }
- }