Flume+Kafka收集Docker容器內分佈式日誌應用實踐

1 背景和問題

隨着雲計算、PaaS平臺的普及,虛擬化、容器化等技術的應用,例如Docker等技術,愈來愈多的服務會部署在雲端。一般,咱們須要須要獲取日誌,來進行監控、分析、預測、統計等工做,可是雲端的服務不是物理的固定資源,日誌獲取的難度增長了,以往能夠SSH登錄的或者FTP獲取的,如今可不那麼容易得到,但這又是工程師迫切須要的,最典型的場景即是:上線過程當中,一切都在GUI化的PaaS平臺點點鼠標完成,可是咱們須要結合tail -F、grep等命令來觀察日誌,判斷是否上線成功。固然這是一種狀況,完善的PaaS平臺會爲咱們完成這個工做,可是還有很是多的ad-hoc的需求,PaaS平臺沒法知足咱們,咱們須要日誌。本文就給出了在分佈式環境下,容器化的服務中的分散日誌,如何集中收集的一種方法。java

2 設計約束和需求描述

作任何設計以前,都須要明確應用場景、功能需求和非功能需求。linux

2.1 應用場景git

分佈式環境下可承載百臺服務器產生的日誌,單條數據日誌小於1k,最大不超過50k,日誌總大小天天小於500G。github

2.2 功能需求docker

1)集中收集全部服務日誌。apache

2)可區分來源,按服務、模塊和天粒度切分。緩存

2.3 非功能需求bash

1)不侵入服務進程,收集日誌功能需獨立部署,佔用系統資源可控。服務器

2)實時性,低延遲,從產生日誌到集中存儲延遲小於4s。架構

3)持久化,保留最近N天。

4)儘可能遞送日誌便可,不要求不丟不重,但比例應該不超過一個閾值(例如萬分之一)。

4)能夠容忍不嚴格有序。

5)收集服務屬於線下離線功能,可用性要求不高,整年知足3個9便可。

3 實現架構

一種方案實現的架構以下圖所示:

Flume+Kafka收集Docker容器內分佈式日誌應用實踐

 3.1 Producer層分析

PaaS平臺內的服務假設部署在Docker容器內,那麼爲了知足非功能需求#1,獨立另一個進程負責收集日誌,所以不侵入服務框架和進程。採用Flume NG來進行日誌的收集,這個開源的組件很是強大,能夠看作一種監控、生產增量,而且能夠發佈、消費的模型,Source就是源,是增量源,Channel是緩衝通道,這裏使用內存隊列緩衝區,Sink就是槽,是個消費的地方。容器內的Source就是執行tail -F這個命令的去利用linux的標準輸出讀取增量日誌,Sink是一個Kafka的實現,用於推送消息到分佈式消息中間件。

3.2 Broker層分析

PaaS平臺內的多個容器,會存在多個Flume NG的客戶端去推送消息到Kafka消息中間件。Kafka是一個吞吐量、性能很是高的消息中間件,採用單個分區按照順序的寫入的方式工做,而且支持按照offset偏移量隨機讀取的特性,所以很是適合作topic發佈訂閱模型的實現。這裏圖中有多個Kafka,是由於支持集羣特性,容器內的Flume NG客戶端能夠鏈接若干個Kafka的broker發佈日誌,也能夠理解爲鏈接若干個topic下的分區,這樣能夠實現高吞吐,一來能夠在Flume NG內部作打包批量發送來減輕QPS壓力,二來能夠分散到多個分區寫入,同時Kafka還會指定replica備份個數,保證寫入某個master後還須要寫入N個備份,這裏設置爲2,沒有采用經常使用的分佈式系統的3,是由於儘可能保證高併發特性,知足非功能需求中的#4。

3.3 Consumer層分析

消費Kafka增量的也是一個Flume NG,能夠看出它的強大之處,在於能夠接入任意的數據源,都是可插拔的實現,經過少許配置便可。這裏使用Kafka Source訂閱topic,收集過來的日誌一樣先入內存緩衝區,以後使用一個File Sink寫入文件,爲了知足功能需求#2,可區分來源,按服務、模塊和天粒度切分,我本身實現了一個Sink,叫作RollingByTypeAndDayFileSink,源代碼放到了github上,能夠從:https://github.com/neoremind/flume-byday-file-sink/releases/tag/1.0.0下載jar,直接放到flume的lib目錄便可。

4 實踐方法

4.1 容器內配置

Dockerfile

Dockerfile是容器內程序的運行腳本,裏面會含有很多docker自帶的命令,下面是要典型的Dockerfile,BASE_IMAGE是一個包含了運行程序以及flume bin的鏡像,比較重要的就是ENTRYPOINT,主要利用supervisord來保證容器內進程的高可用。

FROM ${BASE_IMAGE}
MAINTAINER ${MAINTAINER}
ENV REFRESH_AT ${REFRESH_AT}
RUN mkdir -p /opt/${MODULE_NAME}
ADD ${PACKAGE_NAME} /opt/${MODULE_NAME}/
COPY service.supervisord.conf /etc/supervisord.conf.d/service.supervisord.conf
COPY supervisor-msoa-wrapper.sh /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/*.sh
EXPOSE
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/supervisord.conf"]

下面是supervisord的配置文件,執行supervisor-msoa-wrapper.sh腳本。

[program:${MODULE_NAME}]
command=/opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh

下面是supervisor-msoa-wrapper.sh,這個腳本內的start.sh或者stop.sh就是應用程序的啓動和中止腳本,這裏的背景是咱們的啓停的腳本都是在後臺運行的,所以不會阻塞當前進程,所以直接退出了,Docker就會認爲程序結束,所以應用生命週期也結束,這裏使用wait命令來進行一個阻塞,這樣就能夠保證即便後臺運行的進程,咱們能夠看似是前臺跑的。

這裏加入了flume的運行命令,–conf後面的參數標示會去這個文件夾下面尋找flume-env.sh,裏面能夠定義JAVA_HOME和JAVA_OPTS。–conf-file指定flume實際的source、channel、sink等的配置。

#! /bin/bash
function shutdown()
{
 date
 echo "Shutting down Service"
 unset SERVICE_PID # Necessary in some cases
 cd /opt/${MODULE_NAME}
 source stop.sh
}
 
## 中止進程
cd /opt/${MODULE_NAME}
echo "Stopping Service"
source stop.sh
 
## 啓動進程
echo "Starting Service"
source start.sh
export SERVICE_PID=$!
 
## 啓動Flume NG agent,等待4s日誌由start.sh生成
sleep 4 
nohup /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf --conf-file /opt/apache-flume-1.6.0-bin/conf/logback-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console &
 
# Allow any signal which would kill a process to stop Service
trap shutdown HUP INT QUIT ABRT KILL ALRM TERM TSTP
 
echo "Waiting for $SERVICE_PID"
wait $SERVICE_PID

Flume配置

source本應該採用exec source,執行tailf -F日誌文件便可。可是這裏使用了一個自行開發的StaticLinePrefixExecSource,源代碼能夠在github上找到。之因此採用自定義的,是由於須要將一些固定的信息傳遞下去,例如服務/模塊的名稱以及分佈式服務所在容器的hostname,便於收集方根據這個標記來區分日誌。若是這裏你發現爲何不用flume的攔截器interceptor來作這個工做,加入header中一些KV不就OK了嗎?這是個小坑,我後續會解釋一下。

例如原來日誌的一行爲:

[INFO] 2016-03-18 12:59:31,080 [main] fountain.runner.CustomConsumerFactoryPostProcessor (CustomConsumerFactoryPostProcessor.java:91) -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml

按照以下配置,那麼實際傳遞給Channel的日誌爲:

service1##$$##m1-ocean-1004.cp [INFO] 2016-03-18 12:59:31,080 [main] fountain.runner.CustomConsumerFactoryPostProcessor (CustomConsumerFactoryPostProcessor.java:91) -Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml

channel使用內存緩衝隊列,大小標識可容乃的日誌條數(event size),事務能夠控制一次性從source以及一次性給sink的批量日誌條數,實際內部有個timeout超時,可經過keepAlive參數設置,超時後仍然會推送過去,默認爲3s。

sink採用Kafka sink,配置broker的list列表以及topic的名稱,須要ACK與否,以及一次性批量發送的日誌大小,默認5條一個包,若是併發很大能夠把這個值擴大,加大吞吐。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = com.baidu.unbiz.flume.sink.StaticLinePrefixExecSource
a1.sources.r1.command = tail -F /opt/MODULE_NAME/log/logback.log
a1.sources.r1.channels = c1
a1.sources.r1.prefix=service1
a1.sources.r1.separator=##$$##
a1.sources.r1.suffix=m1-ocean-1004.cp
 
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = keplerlog
a1.sinks.k1.brokerList = gzns-cm-201508c02n01.gzns:9092,gzns-cm-201508c02n02.gzn
s:9092
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 5
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

4.2 Broker配置

參考Kafka官方的教程,這裏新建一個名稱叫作keplerlog的topic,備份數量爲2,分區爲4。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic keplerlog

製造一些增量信息,例如以下腳本,在終端內能夠隨便輸入一些字符串:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic keplerlog

打開另一個終端,訂閱topic,確承認以看到producer的輸入的字符串便可,即表示聯通了。

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic keplerlog --from-beginning

4.3 集中接收日誌配置

Flume配置

首先source採用flume官方提供的KafkaSource,配置好zookeeper的地址,會去找可用的broker list進行日誌的訂閱接收。channel採用內存緩存隊列。sink因爲咱們的需求是按照服務名稱和日期切分日誌,而官方提供的默認file roll sink,只能按照時間戳,和時間interval來切分。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = localhost:2181
a1.sources.r1.topic = keplerlog
a1.sources.r1.batchSize = 5
a1.sources.r1.groupId = flume-collector
a1.sources.r1.kafka.consumer.timeout.ms = 800
 
# Describe the sink
a1.sinks.k1.type = com.baidu.unbiz.flume.sink.RollingByTypeAndDayFileSink
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/work/data/kepler-log
 
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
 
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

定製版RollingByTypeAndDayFileSink

源代碼見github。RollingByTypeAndDayFileSink使用有兩個條件:

1)Event header中必須有timestamp,不然會忽略事件,而且會拋出{@link InputNotSpecifiedException}

2)Event body若是是按照##$$##分隔的,那麼把分隔以前的字符串當作模塊名稱(module name)來處理;若是沒有則默認爲default文件名。

輸出到本地文件,首先要設置一個跟目錄,經過sink.directory設置。其次根據條件#2中提取出來的module name做爲文件名稱前綴,timestamp日誌做爲文件名稱後綴,例如文件名爲portal.20150606或者default.20150703。

規整完的一個文件目錄形式以下,能夠看出聚集了衆多服務的日誌,而且按照服務名稱、時間進行了區分:

~/data/kepler-log$ ls
authorization.20160512 
default.20160513 
default.20160505 
portal.20160512 
portal.20160505 
portal.20160514

不得不提的兩個坑

坑1

回到前兩節提到的自定義了一個StaticLinePrefixExecSource來進行添加一些前綴的工做。因爲要區分來源的服務/模塊名稱,而且按照時間來切分,根據官方flume文檔,徹底能夠採用以下的Source攔截器配置。例如i1表示時間戳,i2表示默認的靜態變量KV,key=module,value=portal。

a1.sources.r1.interceptors = i2 i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = module
a1.sources.r1.interceptors.i2.value = portal

可是flume官方默認的KafkaSource(v1.6.0)的實現:

95 while (eventList.size() < batchUpperLimit &&
96 System.currentTimeMillis() < batchEndTime) {
97 iterStatus = hasNext();
98 if (iterStatus) {
99 // get next message
100 MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
101 kafkaMessage = messageAndMetadata.message();
102 kafkaKey = messageAndMetadata.key();
103
104 // Add headers to event (topic, timestamp, and key)
105 headers = new HashMap<String, String>();
106 headers.put(KafkaSourceConstants.TIMESTAMP,
107 String.valueOf(System.currentTimeMillis()));
108 headers.put(KafkaSourceConstants.TOPIC, topic);
109 if (kafkaKey != null) {
110 headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
111 }
112 if (log.isDebugEnabled()) {
113 log.debug("Message: {}", new String(kafkaMessage));
114 }
115 event = EventBuilder.withBody(kafkaMessage, headers);
116 eventList.add(event);
117 }

能夠看出本身重寫了Event header中的KV,丟棄了發送過來的header,由於這個坑的存在所以,tailf -F在event body中在前面指定模塊/服務名稱,而後RollingByTypeAndDayFileSink會按照分隔符切分。不然下游沒法能達到KV。

坑2

exec source須要執行tail -F命令來經過標準輸出和標準錯誤一行一行的讀取,可是若是把tail -F封裝在一個腳本中,腳本中再執行一些管道命令,例如tail -F logback.log | awk ‘{print "portal##$$##"$0}’,那麼exec source老是會把最近的輸出丟棄掉,致使追加到文件末尾的日誌有一些沒法老是「姍姍來遲」,除非有新的日誌追加,他們纔會被「擠」出來。這裏能夠依靠unbuffer tail來解決,詳見連接(感謝denger的評論)。

5 結語

你們有任何疑問的話均可以留言,關注個人主頁【點擊進入】,瞭解更多!

從這個分佈式服務分散日誌的集中收集方法,能夠看出利用一些開源組件,能夠很是方便的解決咱們平常工做中所發現的問題,而這個發現問題和解決問題的能力纔是工程師的基本素質要求。對於其不知足需求的,須要具有有鑽研精神,知其然還要知其因此然的去作一些ad-hoc工做,才能夠更加好的leverage這些組件。

另外,日誌的收集只是起點,利用寶貴的數據,後面的使用場景和想象空間都會很是大,例如

1)利用Spark streaming在一個時間窗口內計算日誌,作流量控制和訪問限制。

2)使用awk腳本、scala語言的高級函數作單機的訪問統計分析,或者Hadoop、Spark作大數據的統計分析。

3)除了端口存活和語義監控,利用實時計算處理日誌,作ERROR、異常等信息的過濾,實現服務真正的健康保障和預警監控。

4)收集的日誌能夠經過logstash導入Elastic Search,使用ELK方式作日誌查詢使用。

相關文章
相關標籤/搜索