HDFSSink組件中,主要由HDFSEventSink,BucketWriter,HDFSWriter幾個類構成。java
其中HDFSEventSink主要功能呢是斷定Sink的配置條件是否合法,並負責從Channel中獲取events,經過解析event的header信息決定event對應的BucketWriter。app
BucketWriter負責按照rollCount,rollSize等條件在HDFS端生成(roll)文件,經過配置文件配置的文件數據格式以及序列化的方式,在每一個BucetWriter同一處理。ide
HDFSWriter做爲接口,其具體實現有HDFSSequenceFile,HDFSDataStream,HDFSCompressedDataStream這三種this
HDFSSink功能中關鍵類類圖spa
HDFSEventSink類線程
走通HDFSEventSink以前,確定要對其中配置參數有了解(Flume-HDFSSink配置參數說明)orm
一、configure()方法中,從配置文件中獲取filePath,fileName等信息,具體參數含義能夠參考(Flume-HDFSSink配置參數說明)對象
二、start()方法,初始化固定大小線程池callTimeoutPool, 週期執行線程池timedRollerPool,以及sfWriters,並啓動sinkCounterblog
callTimeoutPool接口
timedRollerPool,週期執行線程池中主要有HDFS文件重命名的線程(根據retryInterval),達到生成文件要求進行roll操做的線程(根據idleTimeout),關閉閒置文件的線程等(rollInterval)
sfWriters sfWriters實際是一個LinkedHashMap的實現類,經過重寫removeEldestEntry方法,將最久未使用的writer移除,保證sfWriters中可以維護一個固定大小(maxOpenFiles)的最大打開文件數
sinkCounter sink組件監控指標的計數器
三、process() 方法是HDFSEventSink中最主要的邏輯(部分關鍵節點經過註釋寫到代碼中),
process()方法中獲取到Channel,
並按照batchSize大小循環從Channel中獲取event,經過解析event獲得event的header等信息,肯定該event的HDFS目的路徑以及目的文件名
每一個event可能對應不一樣的bucketWriter和hdfswriter,將每一個event添加到相應的writer中
當event個數達到batchSize個數後,writer進行flush,同時提交事務
其中bucketWriter負責生成(roll)文件的方式,處理文件格式以及序列化等邏輯
其中hdfsWriter具體實現有"SequenceFile","DataStream","CompressedStream";三種,用戶根據hdfs.fileType參數肯定具體hdfsWriter的實現
public Status process() throws EventDeliveryException { Channel channel = getChannel(); //調用父類getChannel方法,創建Channel與Sink之間的鏈接 Transaction transaction = channel.getTransaction();//每次batch提交都創建在一個事務上 transaction.begin(); try { Set<BucketWriter> writers = new LinkedHashSet<>(); int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { Event event = channel.take();//從Channel中取出event if (event == null) {//沒有新的event的時候,則不須要按照batchSize循環取 break; } // reconstruct the path name by substituting place holders // 在配置文件中會有「a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S」這樣的%表示的變量 // 解析配置文件中的變量構造realPath 和 realName String realPath = BucketPath.escapeString(filePath, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); String realName = BucketPath.escapeString(fileName, event.getHeaders(), timeZone, needRounding, roundUnit, roundValue, useLocalTime); String lookupPath = realPath + DIRECTORY_DELIMITER + realName; BucketWriter bucketWriter; HDFSWriter hdfsWriter = null; WriterCallback closeCallback = new WriterCallback() { @Override public void run(String bucketPath) { LOG.info("Writer callback called."); synchronized (sfWritersLock) { sfWriters.remove(bucketPath);//sfWriters以LRU方式維護了一個maxOpenFiles大小的map.始終保持最多打開文件個數 } } }; synchronized (sfWritersLock) { bucketWriter = sfWriters.get(lookupPath); // we haven't seen this file yet, so open it and cache the handle if (bucketWriter == null) { hdfsWriter = writerFactory.getWriter(fileType);//經過工廠獲取文件類型,其中包括"SequenceFile","DataStream","CompressedStream"; bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); sfWriters.put(lookupPath, bucketWriter); } } // Write the data to HDFS try { bucketWriter.append(event); } catch (BucketClosedException ex) { LOG.info("Bucket was closed while trying to append, " + "reinitializing bucket and writing event."); hdfsWriter = writerFactory.getWriter(fileType); bucketWriter = initializeBucketWriter(realPath, realName, lookupPath, hdfsWriter, closeCallback); synchronized (sfWritersLock) { sfWriters.put(lookupPath, bucketWriter); } bucketWriter.append(event); } // track the buckets getting written in this transaction if (!writers.contains(bucketWriter)) { writers.add(bucketWriter); } } if (txnEventCount == 0) { sinkCounter.incrementBatchEmptyCount(); } else if (txnEventCount == batchSize) { sinkCounter.incrementBatchCompleteCount(); } else { sinkCounter.incrementBatchUnderflowCount(); } // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { bucketWriter.flush(); } transaction.commit(); if (txnEventCount < 1) { return Status.BACKOFF; } else { sinkCounter.addToEventDrainSuccessCount(txnEventCount); return Status.READY; } } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF; } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { transaction.close(); } }
BucketWriter
flush() 方法:
BucketWriter中維護了一個batchCounter,在這個batchCounter大小不爲0的時候會進行doFlush(), doFlush()主要就是對batch中的event進行序列化和輸出流flush操做,最終結果就是將events寫入HDFS中。
若是用戶設置了idleTimeout參數不爲0,在doFlush()操做以後,會往定時執行線程池中添加一個任務,該關閉當前鏈接HDFS的輸出對象HDFSWriter,執行時間間隔爲idleTimeout,並將這個延遲調度的任務賦值給idleFuture變量。
append()方法:
在介紹flush()方法中,會介紹一個idleFuture變量對應的功能,在append()方法執行前首先會檢查idleFuture任務是否執行完畢,若是沒有執行完成會設置一個超時時間callTimeout等待該進程完成,而後再進行append以後的操做。這樣作主要是爲了防止關閉HdfsWriter的過程當中還在往HDFS中append數據,在append一半時候,HdfsWriter關閉了。
以後,在正是append()以前,又要首先檢查當前是否存在HDFSWirter可用於append操做,若是沒有調用open()方法。
每次將event往hdfs中append的時候都須要對rollCount,rollSize兩個參數進行檢查,在知足這兩個參數條件的狀況下,就須要將臨時文件重命名爲(roll)正式的HDFS文件。以後,從新再open一個hdfswriter,往這個hdfswriter中append每一個event,當event個數達到batchSize時,進行flush操做。
public synchronized void append(final Event event) throws IOException, InterruptedException { checkAndThrowInterruptedException(); // idleFuture是ScheduledFuture實例,主要功能關閉當前HDFSWriter,在append event以前須要判斷 // idleFuture是否已經執行完成,不然會形成在append一半的時候 hdfswriter被關閉 if (idleFuture != null) { idleFuture.cancel(false); // There is still a small race condition - if the idleFuture is already // running, interrupting it can cause HDFS close operation to throw - // so we cannot interrupt it while running. If the future could not be // cancelled, it is already running - wait for it to finish before // attempting to write. if (!idleFuture.isDone()) { try { idleFuture.get(callTimeout, TimeUnit.MILLISECONDS); } catch (TimeoutException ex) { LOG.warn("Timeout while trying to cancel closing of idle file. Idle" + " file close may have failed", ex); } catch (Exception ex) { LOG.warn("Error while trying to cancel closing of idle file. ", ex); } } idleFuture = null; } // If the bucket writer was closed due to roll timeout or idle timeout, // force a new bucket writer to be created. Roll count and roll size will // just reuse this one if (!isOpen) { if (closed) { throw new BucketClosedException("This bucket writer was closed and " + "this handle is thus no longer valid"); } open(); } // 檢查rollCount,rollSize兩個roll文件的參數,判斷是否roll出新文件 if (shouldRotate()) { boolean doRotate = true; if (isUnderReplicated) { if (maxConsecUnderReplRotations > 0 && consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) { doRotate = false; if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) { LOG.error("Hit max consecutive under-replication rotations ({}); " + "will not continue rolling files under this path due to " + "under-replication", maxConsecUnderReplRotations); } } else { LOG.warn("Block Under-replication detected. Rotating file."); } consecutiveUnderReplRotateCount++; } else { consecutiveUnderReplRotateCount = 0; } if (doRotate) { close(); open(); } } // write the event try { sinkCounter.incrementEventDrainAttemptCount();// sinkCounter統計metrix callWithTimeout(new CallRunner<Void>() { @Override public Void call() throws Exception { writer.append(event); //writer是經過配置參數hdfs.fileType建立的HDFSWriter實現 return null; } }); } catch (IOException e) { LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + bucketPath + ") and rethrowing exception.", e.getMessage()); try { close(true); } catch (IOException e2) { LOG.warn("Caught IOException while closing file (" + bucketPath + "). Exception follows.", e2); } throw e; } // update statistics processSize += event.getBody().length; eventCounter++; batchCounter++; if (batchCounter == batchSize) { flush(); } }