TaildirSource類圖以下(列出主要類)node
TailDirSource類
TailDirSource繼承了AbstractSource類,而AbstractSource類中channelProcessor屬性負責將Source中的Event提交給Channel組件
TailDirSource類經過配置參數匹配日誌文件,獲取日誌文件更新內容而且將已經讀取的偏移量記錄到特定的文件當中(position file)linux
configure()方法:
1.判斷從配置文件加載的配置是否合法,其中包括了對filegroups,以及以filegroups爲單位的文件路徑是否存在等條件。
2.對batchSize,skipToEnd,writePosInterval,idleTimeout等變量進行初始化工做
batchSize定義了往Channel中發送Event的批量處理大小
skipToEnd定義了每次程序啓動,對文件進行讀取的時候,是否從文件尾部開始讀取數據,或者從文件最開始讀取。
writePosInterval,TaildirSource讀取每一個監控文件都在位置文件中記錄監控文件的已經讀取的偏移量,writePosInterval則是定義了更新位置文件的間隔。
idleTimeout日誌文件在idleTimeout間隔時間,沒有被修改,文件將被關閉json
start()方法:
經過configure()初始化後的變量建立了ReliableTaildirEventReader對象,同時建立兩個線程池idleFileChecker和positionWriter,分別用於監控日誌文件和記錄日誌文件讀取的偏移量。
idleFileChecker實現一個Runnable接口,遍歷reader全部監控的文件,檢查文件最後修改時間+idleTimeout是否小於當前時間,說明日誌文件在idleTimeout時間內沒有被修改,該文件將被關閉。windows
private class idleFileCheckerRunnable implements Runnable { @Override public void run() { try { long now = System.currentTimeMillis(); for (TailFile tf : reader.getTailFiles().values()) { if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) { idleInodes.add(tf.getInode()); } } } catch (Throwable t) { logger.error("Uncaught exception in IdleFileChecker thread", t); } } }
positionWriter主要做用是記錄日誌文件讀取的偏移量,以json格式("inode", inode, "pos", tf.getPos(), "file", tf.getPath()),其中inode是linux系統中特有屬性,在適應其餘系統(Windows等)日誌採集時ReliableTaildirEventReader.getInode()方法須要修改(注意:在利用Linux系統上inode實現上,文件是經過inode記錄日誌讀取偏移量。因此即便文件名改變了,也不影響日誌讀取,在我實現Window版本上,只採用了文件名對應日誌讀取偏移量,文件名改變影響日誌讀取
)。pos則是記錄的日誌讀取的偏移量,file記錄了日誌文件的路徑dom
process()方法:
process方法記錄了TailDirSource類中主要的邏輯,獲取每一個監控的日誌文件,調用tailFileProcess獲取每一個日誌文件的更新數據,並將每條記錄轉換爲Event(具體細節要看ReliableTaildirEventReader的readEvents方法)ide
public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping"); } } catch (Throwable t) { logger.error("Unable to tail files", t); status = Status.BACKOFF; } return status; }
ReliableTaildirEventReader類
構造ReliableTaildirEventReader對象的時候,首先會判斷各類必須參數是否合法等,而後加載position file獲取每一個文件上次記錄的日誌文件讀取的偏移量
loadPositionFile(String filePath) 不粘貼方法的具體代碼,主要就是獲取每一個監控日誌文件的讀取偏移量
readEvents()的各個不一樣參數方法中,下面這個是最主要的,該方法獲取當前日誌文件的偏移量,調用TailFile.readEvents(numEvents, backoffWithoutNL, addByteOffset)方法將日誌文件每行轉換爲Flume的消息對象Event,並循環將每一個event添加header信息。線程
public List<Event> readEvents(int numEvents, boolean backoffWithoutNL) throws IOException { if (!committed) { if (currentFile == null) { throw new IllegalStateException("current file does not exist. " + currentFile.getPath()); } logger.info("Last read was never committed - resetting position"); long lastPos = currentFile.getPos(); currentFile.updateFilePos(lastPos); } List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset); if (events.isEmpty()) { return events; } Map<String, String> headers = currentFile.getHeaders(); if (annotateFileName || (headers != null && !headers.isEmpty())) { for (Event event : events) { if (headers != null && !headers.isEmpty()) { event.getHeaders().putAll(headers); } if (annotateFileName) { event.getHeaders().put(fileNameHeader, currentFile.getPath()); } } } committed = false; return events; }
openFile(File file, Map<String, String> headers, long inode, long pos) 方法根據日誌文件對象,headers,inode和偏移量pos建立一個TailFile對象指針
TailFile類
TaildirSource經過TailFile類操做處理每一個日誌文件,包含了RandomAccessFile類,以及記錄日誌文件偏移量pos,最新更新時間lastUpdated等屬性
RandomAccessFile完美的符合TaildirSource的應用場景,RandomAccessFile支持使用seek()方法隨機訪問文件,配合position file中記錄的日誌文件讀取偏移量,可以輕鬆簡單的seek到文件偏移量,而後向後讀取日誌內容,並從新將新的偏移量記錄到position file中。日誌
readEvent(boolean backoffWithoutNL, boolean addByteOffset)方法:
下圖描述了該方法的調用層級,readEvent簡單的理解就是將每行日誌轉爲Event消息體,方法最終調用的是readFile()方法。code
readLine()方法,有點難還在研究
public LineResult readLine() throws IOException { LineResult lineResult = null; while (true) { if (bufferPos == NEED_READING) { if (raf.getFilePointer() < raf.length()) {//當文件指針位置小於文件總長度的時候,就須要讀取指針位置到文件最後的數據 readFile(); } else { if (oldBuffer.length > 0) { lineResult = new LineResult(false, oldBuffer); oldBuffer = new byte[0]; setLineReadPos(lineReadPos + lineResult.line.length); } break; } } for (int i = bufferPos; i < buffer.length; i++) { if (buffer[i] == BYTE_NL) { int oldLen = oldBuffer.length; // Don't copy last byte(NEW_LINE) int lineLen = i - bufferPos; // For windows, check for CR if (i > 0 && buffer[i - 1] == BYTE_CR) { lineLen -= 1; } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) { oldLen -= 1; } lineResult = new LineResult(true, concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen)); setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1))); oldBuffer = new byte[0]; if (i + 1 < buffer.length) { bufferPos = i + 1; } else { bufferPos = NEED_READING; } break; } } if (lineResult != null) { break; } // NEW_LINE not showed up at the end of the buffer oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length, buffer, bufferPos, buffer.length - bufferPos); bufferPos = NEED_READING; } return lineResult; }
readFile()按BUFFER_SIZE(默認8KB)做爲緩衝讀取日誌文件數據
private void readFile() throws IOException { if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) { buffer = new byte[(int) (raf.length() - raf.getFilePointer())]; } else { buffer = new byte[BUFFER_SIZE]; } raf.read(buffer, 0, buffer.length); bufferPos = 0; }