以下是ReportBucket初始化的代碼:數據結構
@Override public void initialize(String name, Date timestamp, int index) throws IOException { m_baseDir = m_configManager.getHdfsLocalBaseDir("report"); m_writeLock = new ReentrantLock(); m_readLock = new ReentrantLock(); String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index); File dataFile = new File(m_baseDir, logicalPath); File indexFile = new File(m_baseDir, logicalPath + ".idx"); //加載索引文件 if (indexFile.exists()) { loadIndexes(indexFile); } final File dir = dataFile.getParentFile(); if (!dir.exists() && !dir.mkdirs()) { throw new IOException(String.format("Fail to create directory(%s)!", dir)); } m_logicalPath = logicalPath; m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192); m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192); m_writeDataFileLength = dataFile.length(); m_readDataFile = new RandomAccessFile(dataFile, "r"); }
以下是loadIndexes的代碼:dom
protected void loadIndexes(File indexFile) throws IOException { BufferedReader reader = null; m_writeLock.lock(); try { reader = new BufferedReader(new FileReader(indexFile)); StringSplitter splitter = Splitters.by('\t'); while (true) { String line = reader.readLine(); if (line == null) { // EOF break; } List<String> parts = splitter.split(line); if (parts.size() >= 2) { String id = parts.remove(0); String offset = parts.remove(0); try { m_idToOffsets.put(id, Long.parseLong(offset)); } catch (NumberFormatException e) { // ignore it } } } } finally { m_writeLock.unlock(); if (reader != null) { reader.close(); } } }
根據索引文件,逐行讀取,每一行的數據結構是 domain offset,中間是tab間隔,這樣就能夠將每一個應用名對應的offset位置拿到了。 示例以下:ide
根據上面初始化的bucket,以下直接讀取報表ui
String xml = bucket.findById(domain);
具體讀取代碼以下:code
/** * 從offset位置開始,讀第一行,第一行的內容是整個報表的長度 */ @Override public String findById(String id) throws IOException { Long offset = m_idToOffsets.get(id); if (offset != null) { m_readLock.lock(); try { m_readDataFile.seek(offset); int num = Integer.parseInt(m_readDataFile.readLine()); byte[] bytes = new byte[num]; m_readDataFile.readFully(bytes); return new String(bytes, "utf-8"); } catch (Exception e) { m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e); } finally { m_readLock.unlock(); } } return null; }
根據應用名稱和報表寫入報表數據文件和索引文件的代碼以下:orm
/** * 根據應用名稱存儲報表 * 0、把當前前的數據文件長度做爲這次應用的offset,寫入到bucket維護的m_idToOffsets中 * 一、拿到報表字符串的字節長度 * 二、拿到報表字符串的字節長度的字節長度 * 三、將報表字符串的字節長度寫入數據文件,帶一個換行符 * 四、將報表字符串寫入數據文件,帶一個換行符 * 五、把 應用名 + '\t' + offset + '\n' 寫入到索引文件 * 五、計算當前的數據文件長度 */ @Override public boolean storeById(String id, String report) throws IOException { byte[] content = report.getBytes("utf-8"); int length = content.length; byte[] num = String.valueOf(length).getBytes("utf-8"); m_writeLock.lock(); try { m_writeDataFile.write(num); m_writeDataFile.write('\n'); m_writeDataFile.write(content); m_writeDataFile.write('\n'); m_writeDataFile.flush(); long offset = m_writeDataFileLength; String line = id + '\t' + offset + '\n'; byte[] data = line.getBytes("utf-8"); m_writeDataFileLength += num.length + 1 + length + 1; m_writeIndexFile.write(data); m_writeIndexFile.flush(); m_idToOffsets.put(id, offset); return true; } finally { m_writeLock.unlock(); } }