1 序html
對ETL系統中數據轉換和存儲操做的相關日誌進行記錄以及實時分析有助於咱們更好的觀察和監控ETL系統的相關指標(如單位時間某些操做的處理時間),發現系統中出現的缺陷和性能瓶頸。java
因爲須要對日誌進行實時分析,因此Storm是咱們想到的首個框架。Storm是一個分佈式實時計算系統,它能夠很好的處理流式數據。利用storm咱們幾乎能夠直接實現一個日誌分析系統,可是將日誌分析系統進行模塊化設計能夠收到更好的效果。模塊化的設計至少有兩方面的優勢:linux
模塊化設計可使功能更加清晰。整個日誌分析系統能夠分爲「數據採集-數據緩衝-數據處理-數據存儲」四個步驟。Apache項目下的flumeng框架能夠很好的從多源目標收集數據,因此咱們用它來從ETL系統中收集日誌信息;因爲採集數據與處理數據的速度可能會出現不一致,因此咱們須要一個消息中間件來做爲緩衝,kafka是一個極好的選擇;而後對流式數據的處理,咱們將選擇大名鼎鼎的storm了,同時爲了更好的對數據進行處理,咱們把drools與storm進行了整合,分離出了數據處理規則,這樣更有利於管理規則;最後,咱們選擇redis做爲咱們處理數據的存儲工具,redis是一個內存數據庫,能夠基於健值進行快速的存取。git
模塊化設計以後,storm和前兩個步驟之間就得到了很好的解耦,storm集羣若是出現問題,數據採集以及數據緩衝的操做還能夠繼續運行,數據不會丟失。github
2 相關框架的介紹和安裝web
2.1 flumengredis
2.1.1 原理介紹sql
Flume是一個高可用、高可靠、分佈式的海量日誌採集、聚合和傳輸系統。Flume支持在日誌系統中定製日誌發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各類數據接收方的能力。它擁有一個簡單的、可擴展的流式數據流架構,以下圖所示:數據庫
日誌收集系統就是由一個或者多個agent(代理)組成,每一個agent由source、channel、sink三部分組成,source是數據的來源,channel是數據進行傳輸的通道,sink用於將數據傳輸到指定的地方。咱們能夠把agent看作一段水管,source是水管的入口,sink是水管的出口,數據流就是水流。 Agent本質上是一個jvm進程,agent各個組件之間是經過event來進行觸發和協調的。apache
2.1.2 flumeng的安裝
從官方網站下載apache-flume-1.4.0-bin.tar.gz壓縮包
解壓縮,並在conf目錄下面新建一個文件flume-conf.properties,內容以下:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source配置信息
#r1的type爲avro表示該source接收的數據協議爲avro,且接收數據由avro客戶端事件驅動
#(也就是說resource要經過avro-cliet向其發送數據)
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#sink配置信息
# type爲logger意將數據輸出至日誌中(也就是打印在屏幕上)
a1.sinks.k1.type = logger
#channel配置信息
#type爲memory意將數據存儲至內存中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#將source和sink綁定至該channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
該配置文件,配置了一個source爲avro的服務器端用於日誌的收集。具體的狀況將在後面ETL系統與flume整合中介紹。
啓動代理。flume-ng agent –n a1 –f flume-conf.properties
2.2 kafka
2.2.1 原理介紹
Kafka是linkedin用於日誌處理的分佈式消息隊列。Kafka的架構以下圖所示:
Kafka的存儲策略有一下幾點:
kafka以topic來進行消息管理,每一個topic包括多個partition,每一個partition包括一個邏輯log,由多個segment組成。
每一個segment中存儲多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
每一個partition在內存中對應一個index,記錄每一個segment中的第一條消息的偏移。
發佈者發到某個topic的消息會被均勻的分佈到多個partition上(隨機或根據用戶指定的回調函數進行分佈),broker收到發佈消息往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發佈時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息訂閱者才能訂閱到,segment達到必定的大小後將不會再往該segment寫數據,broker會建立新的segment。
2.2.2 kafka集羣的搭建
Kafka集羣的搭建須要依賴zookeeper來進行負載均衡,因此咱們須要在安裝kafka以前搭建zookeeper集羣。
zookeeper集羣的搭建,本系統用到了兩臺機器。具體搭建過程見http://blog.csdn.net/itleochen/article/details/17453881
分別下載kafka_2.9.2-0.8.1的安裝包到兩臺機器,並解壓該安裝包。
打開conf/server.properties文件,修改配置項broker.id、zookeeper.connect、partitions以及host.name爲相應的值。
分別啓動kafka即完成了集羣的搭建。
2.3 storm
2.3.1 原理介紹
Storm是一個分佈式的、高容錯的實時計算系統。Storm對於實時計算的的意義至關於Hadoop對於批處理的意義。hadoop爲咱們提供了Map和Reduce原語,使咱們對數據進行批處理變的很是的簡單和優美。一樣,Storm也對數據的實時計算提供了簡單Spout和Bolt原語。
Strom集羣裏面有兩種節點,控制節點和工做節點,控制節點上面運行一個nimbus(相似於hadoop中的JobTracker)後臺程序,Nimbus負責在集羣裏面分佈代碼,分配工做給機器, 而且監控狀態。每個工做節點上面運行一個叫作Supervisor(相似Hadoop中的TaskTracker)的節點。Supervisor會監聽分配給它那臺機器的工做,根據須要啓動/關閉工做進程。每個工做進程執行一個Topology(相似hadoop中的Job)的一個子集;一個運行的Topology由運行在不少機器上的不少工做進程 Worker(相似Hadoop中的Child)組成。結構以下圖所示:
Stream是storm裏面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。storm提供一些原語來分佈式地、可靠地把一個stream傳輸進一個新的stream。好比: 你能夠把一個tweets流傳輸到熱門話題的流。
storm提供的最基本的處理stream的原語是spout和bolt。你能夠實現Spout和Bolt對應的接口以處理你的應用的邏輯。
Spout是流的源頭。好比一個spout可能從Kestrel隊列裏面讀取消息而且把這些消息發射成一個流。一般Spout會從外部數據源(隊列、數據庫等)讀取數據,而後封裝成Tuple形式,以後發送到Stream中。Spout是一個主動的角色,在接口內部有個nextTuple函數,Storm框架會不停的調用該函數。
Bolt能夠接收任意多個輸入stream。Bolt處理輸入的Stream,併產生新的輸出Stream。Bolt能夠執行過濾、函數操做、Join、操做數據庫等任何操做。Bolt是一個被動的角色,其接口中有一個execute(Tuple input)方法,在接收到消息以後會調用此函數,用戶能夠在此方法中執行本身的處理邏輯。
spout和bolt所組成一個網絡會被打包成topology, topology是storm裏面最高一級的抽象(相似 Job), 你能夠把topology提交給storm的集羣來運行。Topology的結構以下圖所示:
2.3.2 storm集羣的搭建
Storm集羣的搭建也要依賴於zookeeper,本系統中storm與kafka共用一樣一個zookeeper集羣。
下載安裝包storm-0.9.0.1.tar.gz,並對該包進行解壓。
配置nimbus。 修改storm的conf/storm.yaml文件以下:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
storm.zookeeper.servers: //zookeeper集羣
-「10.200.187.71″
-「10.200.187.73″
storm.local.dir:「/usr/endy/fks/storm-workdir「
storm.messaging.transport:「backtype.storm.messaging.netty.Context」
storm.messaging.netty.server_worker_threads:1
storm.messaging.netty.client_worker_threads:1
storm.messaging.netty.buffer_size:5242880
storm.messaging.netty.max_retries:100
storm.messaging.netty.max_wait_ms:1000
storm.messaging.netty.min_wait_ms:100
注意:在每一個配置項前面必須留有空格,不然會沒法識別。storm.messaging.* 部分是Netty的配置。若是沒有該部分。那麼Storm默認仍是使用ZeroMQ。
配置supervisor 修改storm的conf/storm.yaml文件以下:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
storm.zookeeper.servers:
- 「10.200.187.71″
- 「10.200.187.73″
nimbus.host: 「10.200.187.71″
supervisor.slots.ports:
- 6700
- 6701
- 6702
storm.local.dir: 「/usr/endy/fks/storm-workdir」
storm.messaging.transport: 「backtype.storm.messaging.netty.Context」
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100
注意
nimbus.host是nimbus的IP或hostname
supervisor.slots.ports 是配置slot的ip地址。配了幾個地址,就有幾個slot,即幾個worker。若是嘗試提交的topology所聲明的worker數超過當前可用的slot,該topology提交會失敗。
storm.messaging 部分是Netty的配置。
2.4 drools
Drools是一個基於Java的、開源的規則引擎,能夠將複雜多變的規則從硬編碼中解放出來,以規則腳本的形式存放在文件中,使得規則的變動不須要修正代碼重啓機器就能夠當即在線上環境生效。 日誌分析系統中,drools的做用是利用不一樣的規則對日誌信息進行處理,以得到咱們想要的數據。可是,Drools自己不是一個分佈式框架,因此規則引擎對log的處理沒法作到分佈式。咱們的策略是將drools整合到storm的bolt中去,這就就解決了drools沒法分佈式的問題。這是由於bolt能夠做爲task分發給多個worker來處理,這樣drools中的規則也天然被多個worker處理了。
2.5 redis
Redis是key-value存儲系統,它支持較爲豐富的數據結構,有String,list,set,hash以及zset。與memcached同樣,爲了保證效率,數據都是緩存在內存中。區別的是redis會週期性的把更新的數據寫入磁盤或者把修改操做寫入追加的記錄文件,而且在此基礎上實現了master-slave(主從)同步。 Redis是內存數據庫,因此有很是快速的存取效率。日誌分析系統數據量並非特別大,可是對存取的速度要求較高,因此選擇redis有很大的優點。
3 各個框架的整合
3.1 ETL系統整合flumeng
Flume如何收集ETL系統中的日誌是我須要考慮的第一個問題。log4j2提供了專門的Appender-FlumeAppender用於將log信息發送到flume系統,並不須要咱們來實現。咱們在log4j2的配置文件中配置了ETL系統將log信息發送到的目的地,即avro服務器端。該服務器端咱們在flume的配置文件中進行了配置。配置信息以下所示:
0
1
2
3
4
5
6
7
8
9
producer.sources=s
producer.channels=c
producer.sinks=r
producer.sources.s.type=avro
producer.sources.s.channels=c
producer.sources.s.bind=10.200.187.71
producer.sources.s.port=4141
3.2 flumeng與kafka的整合
咱們從ETL系統中得到了日誌信息,將該信息不做任何處理傳遞到sink端,sink端發送數據到kafka。這個發送過程須要咱們編寫代碼來實現,咱們的實現代碼爲KafkaSink類。主要代碼以下所示:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class KafkaSink extends AbstractSink implements Configurable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
private Properties parameters;
private Producer<String, String> producer;
private Context context;
@Override
public void configure(Context context) {
this.context = context;
ImmutableMap<String, String> props = context.getParameters();
parameters = new Properties();
for (String key : props.keySet()) {
String value = props.get(key);
this.parameters.put(key, value);
}
}
@Override
public synchronized void start() {
super.start();
ProducerConfig config = new ProducerConfig(this.parameters);
this.producer = new Producer<String, String>(config);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
String encoding = StringUtils.defaultIfEmpty(
(String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
KafkaFlumeConstans.DEFAULT_ENCODING);
String topic = Preconditions.checkNotNull(
(String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
「custom.topic.name is required」);
String eventData = new String(event.getBody(), encoding);
KeyedMessage<String, String> data;
// if partition key does’nt exist
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, String>(topic, eventData);
} else {
data = new KeyedMessage<String, String>(topic, String.valueOf(new Random().nextInt(Integer.parseInt(partitionKey))), eventData);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(「Send Message to Kafka : [" + eventData + "] — [" + EventHelper.dumpEvent(event) + "]「);
}
producer.send(data);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
@Override
public void stop() {
producer.close();
}
}
該類中,咱們讀取了一些配置信息,這些配置信息咱們在flumeng的flume-conf.properties文件中進行了定義,定義內容以下:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
producer.sinks.r.type=org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=10.200.187.71:9092
producer.sinks.r.partition.key=0
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=fks1
producer.sinks.r.channel=c
producer.channels.c.type=memory
producer.channels.c.capacity=1000
將上面的KafkaSink類打包成flumeng-kafka.jar,並將該jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目錄下,啓動flume,咱們就能夠將ETL系統中產生的日誌信息發送到kafka中的fks1這個topic中去了。
3.3 kafka與storm的整合
Storm中的spout如何主動消費kafka中的消息須要咱們編寫代碼來實現,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus實現了一個kafka與storm整合的插件,下載該插件,將插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目錄下。利用插件中的StormSpout類,咱們就能夠消費kafka中的消息了。主要代碼以下所示:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public class KafkaSpout extends BaseRichSpout {
public static class MessageAndRealOffset {
public Message msg;
public long offset;
public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
}
}
static enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
NO_EMITTED
}
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
long _lastUpdateMs = 0;
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
Map stateConf = new HashMap(conf);
List zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
context.registerMetric(「kafkaOffset」, new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Set latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, 60);
context.registerMetric(「kafkaPartition」, new IMetric() {
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, 60);
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
List managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now – _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
}
}
@Override
public void deactivate() {
commit();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
}
3.4
storm中bolt與drools的整合 Drools能夠將storm中處理數據的規則提取到一個drl文件中,該文件就成了惟一處理規則的文件。任什麼時候候規則出現變化,咱們只須要修改該drl文件,而不會改變其它的代碼。Bolt與drools的整合代碼以下所示:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
publicclassLogRulesBoltimplementsIBasicBolt{
Loggerlogger=LoggerFactory.getLogger(LogRulesBolt.class);
privatestaticfinallongserialVersionUID=1L;
publicstaticfinalStringLOG_ENTRY=「str」;
privateStatelessKnowledgeSessionksession;
privateStringdrlFile;
publicLogRulesBolt()
{}
publicLogRulesBolt(StringdrlFile)
{
this.drlFile=drlFile;
}
@Override
publicvoidprepare(MapstormConf,TopologyContextcontext){
KnowledgeBuilderkbuilder=KnowledgeBuilderFactory.newKnowledgeBuilder();
try{
kbuilder.add(ResourceFactory.newInputStreamResource(newFileInputStream(newFile(drlFile))),ResourceType.DRL);
}catch(FileNotFoundExceptione){
logger.error(e.getMessage());
}
KnowledgeBasekbase=KnowledgeBaseFactory.newKnowledgeBase();
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
ksession=kbase.newStatelessKnowledgeSession();
}
@Override
publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){
StringlogContent=(String)input.getValueByField(LOG_ENTRY);
logContent=logContent.trim();
if(!」」.equals(logContent)&&logContent!=null)
{
LogEntryentry=newLogEntry(logContent);
try{
ksession.execute(entry);
}catch(Exceptione)
{
logger.error(「droolstohandlelog["+logContent+"]isfailure!」);
logger.error(e.getMessage());
}
collector.emit(newValues(entry));
}
else
{
logger.error(「logcontentisempty!」);
}
}
@Override
publicvoidcleanup(){
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){
declarer.declare(newFields(LOG_ENTRY));
}
@Override
publicMap<String,Object>getComponentConfiguration(){
returnnull;
}
}
經過規則處理數據以後,咱們就能夠將處理過的數據發送到下一個bolt中,而後將數據存儲到redis中。
4 相關思考
4.1 系統的優勢
模塊化的設計,使功能分散到各個模塊中,對各個功能進行了解耦,使系統的容錯性更高。
kafka做爲中間緩衝,解決了flume和storm速度不匹配的問題。
利用drools將規則和數據進行了解耦。把規則寫到一個配置文件中,避免了每次修改規則就要修改代碼的缺點。
storm和drools整合解決了drools的規則引擎沒法並行化的問題。
redis是內存數據庫,能夠很快速的寫數據到數據庫中,加快了整個系統的處理速度,避免了數據庫的瓶頸。
4.2 待考慮的問題
整個系統尚未用大量數據進行測試,穩定性以及性能瓶頸須要進一步的考慮、發現和改進。
在現有的系統中,flume只能發送數據到kafka的單個broker的單個partition中,後期須要修改代碼以適應多個broker多個partition。這點是能夠實現的,我已經實現了一部分。能夠將數據發送到單個broker的多個partition中。
現有的系統,修改規則文件以後,須要從新啓動topology,沒法進行熱加載。這點是須要進一步考慮的。
drools是一個優異的規則引擎。可是它的速度仍然讓我有點擔憂。這個問題可能在之後數據變大以後會體現出來。咱們思考了esper這個開源的規則引擎,它的速度更快,可是它類sql語言的規則處理語言不是太適合咱們的日誌分析系統。之後是否是可以做進一步的開發,用esper代替drools是咱們要考慮的一個問題。
思考現有的架構,flume並非缺一不可的模塊,咱們能夠在ETL系統中直接將log信息發送到kafka中,而後利用storm進行處理。可是爲了整個系統的可擴展性(例如咱們還想要將log信息發送到HDFS中,利用flume能夠直接配置)和易配置性,利用flume會更好。是否要用flume,flume是否會影響整個系統的速度,須要之後進一步的論證。
flume、kafka、storm、redis的各個參數的取值對系統的影響也較大。因此這些參數須要在之後的應用中選定合適的值。
4.3 框架層面的思考
flume是純java實現的框架,比較有趣的是各類source接口(如avro source、thrift source)以及sink(HDFS sink、Logger sink)接口的實現。之後有興趣能夠進一步閱讀源代碼。
kafka的思路很好,充分利用了磁盤順序寫入和順序讀取的路子,存儲的性能很好,只要幾個節點就能處理大量的消息了;另外,它突破了常規的一些消息中間件由服務端來記錄消息消費狀態的傳統,完全由客戶端本身來記錄究竟處理到哪裏了,失敗也罷成功也罷,客戶端原本是最清楚的了,由它來記錄消費狀態是最適合不過了。Kafka中這種處理思路是咱們值得學習的地方,咱們也能夠看代碼來體會這種設計。Kafka是由scala實現的,沒有scala基礎的能夠先看看scala編程。
storm主要是用clojure、java來實現的,還包括部分的Python代碼。代碼量25000行左右。在它的源代碼中,用java實現框架結構,clojure實現功能細節。storm中的模擬本地集羣的實現,保證消息只處理一次的功能的實現,都很巧妙,值得咱們去看代碼,無論是如今用獲得仍是用不到。
redis是c實現的,速度很快,代碼量不大。
海量可視化日誌分析平臺之ELK搭建
ELK是什麼?
E=ElasticSearch ,一款基於的Lucene的分佈式搜索引擎,咱們熟悉的github,就是由ElastiSearch提供的搜索,據傳已經有10TB+的數據量。
L=LogStash , 一款分佈式日誌收集系統,支持多輸入源,並內置一些過濾操做,支持多輸入元
K=Kibana , 一款配合ElasticSearch的web可視化界面,內置很是各類查詢,聚合操做,並擁有漂亮的圖形化展現功能
爲何要用ELK?
在實際應用中,咱們的日誌是很是重要的,它一般會記錄一些比較重要的信息,如應用程序的log記錄的error,warn級別的log,一般在量小的狀況下,咱們能夠直接vi+awk+sed+grep定位緣由,在量大的時候,這種方式就捉襟見肘了,並且咱們還要各類聚合,或者基於異常多個關鍵詞的搜索,並有且,或,交,並,差,補,排序等一些操做,並且相應速度必須給力,若是線上環境出了故障,可以馬上準肯定位,ELK就是高手,在百萬大軍中取上將首級,猶如探囊取物,因此這時候ELK就很是適合了,固然除此以外,ELK也常常在運維工做中大放光彩,在應用級別的實時監控,很是適合一些重要核心服務的預警。
ELK如何安裝搭建?
環境要求:
Linux系統:Centos7
Java版本:JDK1.8
ELK均爲最新版本:
(1)ElasticSearch1.7.2 基於Lucene4.10.4的版本
(2)Logstash1.5.4
(3)Kibana4.1.2
集羣拓撲:
ElasticSearch 3臺機器:
集羣名:search
機器名+es節點名 => ip地址
h1 => 192.168.1.120
h2 => 192.168.1.121
h3 => 192.168.1.122
Logstash 192.168.1.120
Kibana 192.168.1.120
下載:
elasticsearch: wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
logstash : wget https://download.elastic.co/logstash/logstash/logstash-1.5.4.tar.gz
kibana : wget https://download.elastic.co/kibana/kibana/kibana-4.1.2-linux-x64.tar.gz
一: 安裝elasticsearch集羣:
(1)解壓到指定目錄
(2)配置elasticsearch.yml裏面,集羣名字和節點名字,若是不配置默認集羣名爲elasticsearch,節點名隨機生成一個
(3) 在線安裝head和bigdisk插件
直接在elasticsearch的根目錄下,分別輸入:
bin/plugin --install mobz/elasticsearch-head 安裝head
bin/plugin --install lukas-vlcek/bigdesk 安裝bigdesk
(4)scp分發安裝完畢後的elasticsearch
(5)依次啓動各個機器上的elasticsearch
(6)head顯示以下:
(7)bigdisk顯示以下:
二:安裝logstash
(1)解壓到指定目錄
(2)在根木下新建一個conf目錄,在裏面新建一個配置文件first.conf
(3)啓動logstash
執行命令: bin/logstash -f conf/first.conf
三:安裝kibana
(1)解壓到指定目錄
(2)在根目錄執行bin/kibana直接啓動
(3)訪問http://192.168.1.120:5601/ 配置一個ElasticSearch索引
(4)在logstach裏面添加數據
(5)查看圖表:剛新加的數據
四: 至此,ELK組件已經安裝完畢,帶圖形化界面的簡單日誌查詢分析系統就搞定了
本篇只是一個簡單的入門例子,如需深刻能夠研究elastic的官網文檔:
https://www.elastic.co/guide/index.html
--------------------------------------------------------------------------------------------------------------
【海量日誌的分析】
-------------------------------------------
收集數據——切割篩選數據——分析數據——統計數據——可視化的數據展現
↓
(分段分割,按照時間/類型/應用來分,使用Splunk/AWK/MapReducer和Hive等軟件統計分析)
海量的前提:你得有分佈式的文件存儲系統 否則無法作到海量(TB或者EB級別的數據)
一個完整的海量日誌分析處理過程應當分爲 集中存儲、搜索(檢索)和分析日誌;
日誌集中存儲,實時監測日誌,快速搜索分析,及時發出告警。另外還須要日誌數據的可視化,經過報表、表格、圖表等形式展現日誌的數量、級別、詳細信息等。複雜的還須要加入流計算。
現有產品舉例:阿里雲數加MaxCompute:https://www.aliyun.com/product/odps
排序算得上是大數據系統的一個「剛需」,不管大數據採用的是hadoop,仍是spark,仍是impala,hive,總之排序是必不可少的
面對海量的日誌文件數據,你們如何看待下面三個技術細節呢?
1.如何收集日誌
2.如何存儲日誌
3.如何分析日誌 ——-歡迎你們吐槽
日誌分析統計工具 AWStats
日誌分析軟件 Splunk
日誌分析平臺 Kibana
以及bat等公司推出的統計分析工具 以及藉助growingio 或者 諸葛IO 等第三方平臺便可實現