基於Flume的美團日誌收集系統(二)改進和優化

問題導讀:
1.Flume的存在些什麼問題?
2.基於開源的Flume美團增長了哪些功能?
3.Flume系統如何調優?






在《 基於Flume的美團日誌收集系統(一)架構和設計》中,咱們詳述了基於Flume的美團日誌收集系統的架構設計,以及爲何作這樣的設計。在本節中,咱們將會講述在實際部署和使用過程當中遇到的問題,對Flume的功能改進和對系統作的優化。
1 Flume的問題總結
在Flume的使用過程當中,遇到的主要問題以下:
a. Channel「水土不服」:使用固定大小的MemoryChannel在日誌高峯時常報隊列大小不夠的異常;使用FileChannel又致使IO繁忙的問題;
b. HdfsSink的性能問題:使用HdfsSink向Hdfs寫日誌,在高峯時間速度較慢;
c. 系統的管理問題:配置升級,模塊重啓等;
2 Flume的功能改進和優化點
從上面的問題中能夠看到,有一些需求是原生Flume沒法知足的,所以,基於開源的Flume咱們增長了許多功能,修改了一些Bug,而且進行一些調優。下面將對一些主要的方面作一些說明。
2.1 增長Zabbix monitor服務
一方面,Flume自己提供了http, ganglia的監控服務,而咱們目前主要使用zabbix作監控。所以,咱們爲Flume添加了zabbix監控模塊,和sa的監控服務無縫融合。
另外一方面,淨化Flume的metrics。只將咱們須要的metrics發送給zabbix,避免 zabbix server形成壓力。目前咱們最爲關心的是Flume可否及時把應用端發送過來的日誌寫到Hdfs上, 對應關注的metrics爲:
  • Source : 接收的event數和處理的event數
  • Channel : Channel中擁堵的event數
  • Sink : 已經處理的event數
2.2 爲HdfsSink增長自動建立index功能
首先,咱們的HdfsSink寫到hadoop的文件採用lzo壓縮存儲。 HdfsSink能夠讀取hadoop配置文件中提供的編碼類列表,而後經過配置的方式獲取使用何種壓縮編碼,咱們目前使用lzo壓縮數據。採用lzo壓縮而非bz2壓縮,是基於如下測試數據:
event大小(Byte) sink.batch-size hdfs.batchSize 壓縮格式 總數據大小(G) 耗時(s) 平均events/s 壓縮後大小(G)
544 300 10000 bz2 9.1 2448 6833 1.36
544 300 10000 lzo 9.1 612 27333 3.49
其次,咱們的HdfsSink增長了建立lzo文件後自動建立index功能。Hadoop提供了對lzo建立索引,使得壓縮文件是可切分的,這樣 Hadoop Job能夠並行處理數據文件。HdfsSink自己lzo壓縮,但寫完lzo文件並不會建索引,咱們在close文件以後添加了建索引功能。
  1. /**
  2.    * Rename bucketPath file from .tmp to permanent location.
  3.    */
  4.   private void renameBucket() throws IOException, InterruptedException {
  5.       if(bucketPath.equals(targetPath)) {
  6.               return;
  7.         }

  8.         final Path srcPath = new Path(bucketPath);
  9.         final Path dstPath = new Path(targetPath);

  10.         callWithTimeout(new CallRunner<Object>() {
  11.               @Override
  12.               public Object call() throws Exception {
  13.                 if(fileSystem.exists(srcPath)) { // could block
  14.                       LOG.info("Renaming " + srcPath + " to " + dstPath);
  15.                      fileSystem.rename(srcPath, dstPath); // could block

  16.                       //index the dstPath lzo file
  17.                       if (codeC != null && ".lzo".equals(codeC.getDefaultExtension()) ) {
  18.                               LzoIndexer lzoIndexer = new LzoIndexer(new Configuration());
  19.                               lzoIndexer.index(dstPath);
  20.                       }
  21.                 }
  22.                 return null;
  23.               }
  24.     });
  25. }
複製代碼

2.3 增長HdfsSink的開關
咱們在HdfsSink和DualChannel中增長開關,當開關打開的狀況下,HdfsSink再也不往Hdfs上寫數據,而且數據只寫向DualChannel中的FileChannel。以此策略來防止Hdfs的正常停機維護。
2.4 增長DualChannel
Flume自己提供了MemoryChannel和FileChannel。MemoryChannel處理速度快,但緩存大小有限,且沒有持久 化;FileChannel則恰好相反。咱們但願利用二者的優點,在Sink處理速度夠快,Channel沒有緩存過多日誌的時候,就使用 MemoryChannel,當Sink處理速度跟不上,又須要Channel可以緩存下應用端發送過來的日誌時,就使用FileChannel,由此我 們開發了DualChannel,可以智能的在兩個Channel之間切換。
其具體的邏輯以下:
  1. /***
  2. * putToMemChannel indicate put event to memChannel or fileChannel
  3. * takeFromMemChannel indicate take event from memChannel or fileChannel
  4. * */
  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);

  7. void doPut(Event event) {
  8.         if (switchon && putToMemChannel.get()) {
  9.               //往memChannel中寫數據
  10.               memTransaction.put(event);

  11.               if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
  12.                 putToMemChannel.set(false);
  13.               }
  14.         } else {
  15.               //往fileChannel中寫數據
  16.               fileTransaction.put(event);
  17.         }
  18.   }

  19. Event doTake() {
  20.     Event event = null;
  21.     if ( takeFromMemChannel.get() ) {
  22.         //從memChannel中取數據
  23.         event = memTransaction.take();
  24.         if (event == null) {
  25.             takeFromMemChannel.set(false);
  26.         } 
  27.     } else {
  28.         //從fileChannel中取數據
  29.         event = fileTransaction.take();
  30.         if (event == null) {
  31.             takeFromMemChannel.set(true);

  32.             putToMemChannel.set(true);
  33.         } 
  34.     }
  35.     return event;
  36. }
複製代碼



2.5 增長NullChannel
Flume提供了NullSink,能夠把不須要的日誌經過NullSink直接丟棄,不進行存儲。然而,Source須要先將events存放到 Channel中,NullSink再將events取出扔掉。爲了提高性能,咱們把這一步移到了Channel裏面作,因此開發了 NullChannel。
2.6 增長KafkaSink
爲支持向Storm提供實時數據流,咱們增長了KafkaSink用來向Kafka寫實時數據流。其基本的邏輯以下:
  1. public class KafkaSink extends AbstractSink implements Configurable {
  2.         private String zkConnect;
  3.         private Integer zkTimeout;
  4.         private Integer batchSize;
  5.         private Integer queueSize;
  6.         private String serializerClass;
  7.         private String producerType;
  8.         private String topicPrefix;

  9.         private Producer<String, String> producer;

  10.         public void configure(Context context) {
  11.             //讀取配置,並檢查配置
  12.         }

  13.         @Override
  14.         public synchronized void start() {
  15.             //初始化producer
  16.         }

  17.         @Override
  18.         public synchronized void stop() {
  19.             //關閉producer
  20.         }

  21.         @Override
  22.         public Status process() throws EventDeliveryException {

  23.             Status status = Status.READY;

  24.             Channel channel = getChannel();
  25.             Transaction tx = channel.getTransaction();
  26.             try {
  27.                     tx.begin();

  28.                     //將日誌按category分隊列存放
  29.                     Map<String, List<String>> topic2EventList = new HashMap<String, List<String>>();

  30.                     //從channel中取batchSize大小的日誌,從header中獲取category,生成topic,並存放於上述的Map中;

  31.                     //將Map中的數據經過producer發送給kafka 

  32.                    tx.commit();
  33.             } catch (Exception e) {
  34.                     tx.rollback();
  35.                     throw new EventDeliveryException(e);
  36.             } finally {
  37.                 tx.close();
  38.             }
  39.             return status;
  40.         }
  41. }
複製代碼



2.7 修復和scribe的兼容問題
Scribed在經過ScribeSource發送數據包給Flume時,大於4096字節的包,會先發送一個Dummy包檢查服務器的反應,而 Flume的ScribeSource對於logentry.size()=0的包返回TRY_LATER,此時Scribed就認爲出錯,斷開鏈接。這 樣循環反覆嘗試,沒法真正發送數據。如今在ScribeSource的Thrift接口中,對size爲0的狀況返回OK,保證後續正常發送數據。
3. Flume系統調優經驗總結3.1 基礎參數調優經驗
  • HdfsSink中默認的serializer會每寫一行在行尾添加一個換行符,咱們日誌自己帶有換行符,這樣會致使每條日誌後面多一個空行,修改配置不要自動添加換行符;
  1. lc.sinks.sink_hdfs.serializer.appendNewline = false
複製代碼


  • 調大MemoryChannel的capacity,儘可能利用MemoryChannel快速的處理能力;
  • 調大HdfsSink的batchSize,增長吞吐量,減小hdfs的flush次數;
  • 適當調大HdfsSink的callTimeout,避免沒必要要的超時錯誤;

3.2 HdfsSink獲取Filename的優化
HdfsSink的path參數指明瞭日誌被寫到Hdfs的位置,該參數中能夠引用格式化的參數,將日誌寫到一個動態的目錄中。這方便了日誌的管理。例如咱們能夠將日誌寫到category分類的目錄,而且按天和按小時存放:
  1. lc.sinks.sink_hdfs.hdfs.path = /user/hive/work/orglog.db/%{category}/dt=%Y%m%d/hour=%H
複製代碼

HdfsS ink中處理每條event時,都要根據配置獲取此event應該寫入的Hdfs path和filename,默認的獲取方法是經過正則表達式替換配置中的變量,獲取真實的path和filename。由於此過程是每條event都要 作的操做,耗時很長。經過咱們的測試,20萬條日誌,這個操做要耗時6-8s左右。
因爲咱們目前的path和filename有固定的模式,能夠經過字符串拼接得到。然後者比正則匹配快幾十倍。拼接定符串的方式,20萬條日誌的操做只須要幾百毫秒。
3.3 HdfsSink的b/m/s優化
在咱們初始的設計中,全部的日誌都經過一個Channel和一個HdfsSink寫到Hdfs上。咱們來看一看這樣作有什麼問題。
首先,咱們來看一下HdfsSink在發送數據的邏輯:
  1. //從Channel中取batchSize大小的events
  2. for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
  3.     //對每條日誌根據category append到相應的bucketWriter上;
  4.     bucketWriter.append(event);


  5. for (BucketWriter bucketWriter : writers) {
  6.     //而後對每個bucketWriter調用相應的flush方法將數據flush到Hdfs上
  7.     bucketWriter.flush();
複製代碼


假設咱們的系統中有100個category,batchSize大小設置爲20萬。則每20萬條數據,就須要對100個文件進行append或者flush操做。
其次,對於咱們的日誌來講,基本符合80/20原則。即20%的category產生了系統80%的日誌量。這樣對大部分日誌來講,每20萬條可能只包含幾條日誌,也須要往Hdfs上flush一次。
上述的狀況會致使HdfsSink寫Hdfs的效率極差。下圖是單Channel的狀況下每小時的發送量和寫hdfs的時間趨勢圖。
 
鑑於這種實際應用場景,咱們把日誌進行了大小歸類,分爲big, middle和small三類,這樣能夠有效的避免小日誌跟着大日誌一塊兒頻繁的flush,提高效果明顯。下圖是分隊列後big隊列的每小時的發送量和寫hdfs的時間趨勢圖。
相關文章
相關標籤/搜索