大衆點評Cat源碼分析(五)——Report磁盤讀寫文件邏輯

1、Report讀取邏輯

1.1 加載report索引邏輯

以下是ReportBucket初始化的代碼:數據結構

@Override
	public void initialize(String name, Date timestamp, int index) throws IOException {
		m_baseDir = m_configManager.getHdfsLocalBaseDir("report");
		m_writeLock = new ReentrantLock();
		m_readLock = new ReentrantLock();

		String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);

		File dataFile = new File(m_baseDir, logicalPath);
		File indexFile = new File(m_baseDir, logicalPath + ".idx");

		//加載索引文件
		if (indexFile.exists()) {
			loadIndexes(indexFile);
		}

		final File dir = dataFile.getParentFile();

		if (!dir.exists() && !dir.mkdirs()) {
			throw new IOException(String.format("Fail to create directory(%s)!", dir));
		}

		m_logicalPath = logicalPath;
		m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
		m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
		m_writeDataFileLength = dataFile.length();
		m_readDataFile = new RandomAccessFile(dataFile, "r");
	}

以下是loadIndexes的代碼:dom

protected void loadIndexes(File indexFile) throws IOException {
		BufferedReader reader = null;
		m_writeLock.lock();
		try {
			reader = new BufferedReader(new FileReader(indexFile));
			StringSplitter splitter = Splitters.by('\t');

			while (true) {
				String line = reader.readLine();

				if (line == null) { // EOF
					break;
				}

				List<String> parts = splitter.split(line);

				if (parts.size() >= 2) {
					String id = parts.remove(0);
					String offset = parts.remove(0);

					try {
						m_idToOffsets.put(id, Long.parseLong(offset));
					} catch (NumberFormatException e) {
						// ignore it
					}
				}
			}
		} finally {
			m_writeLock.unlock();
			if (reader != null) {
				reader.close();
			}
		}
	}

根據索引文件,逐行讀取,每一行的數據結構是 domain offset,中間是tab間隔,這樣就能夠將每一個應用名對應的offset位置拿到了。 示例以下:ide

1.2加載report數據邏輯

根據上面初始化的bucket,以下直接讀取報表ui

String xml = bucket.findById(domain);

具體讀取代碼以下:code

/**
	 * 從offset位置開始,讀第一行,第一行的內容是整個報表的長度
	 */
	@Override
	public String findById(String id) throws IOException {
		Long offset = m_idToOffsets.get(id);

		if (offset != null) {
			m_readLock.lock();

			try {
				m_readDataFile.seek(offset);

				int num = Integer.parseInt(m_readDataFile.readLine());
				byte[] bytes = new byte[num];

				m_readDataFile.readFully(bytes);

				return new String(bytes, "utf-8");
			} catch (Exception e) {
				m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e);
			} finally {
				m_readLock.unlock();
			}
		}

		return null;
	}

2、report寫入邏輯

根據應用名稱和報表寫入報表數據文件和索引文件的代碼以下:orm

/**
	 * 根據應用名稱存儲報表
	 * 0、把當前前的數據文件長度做爲這次應用的offset,寫入到bucket維護的m_idToOffsets中
	 * 一、拿到報表字符串的字節長度
	 * 二、拿到報表字符串的字節長度的字節長度
	 * 三、將報表字符串的字節長度寫入數據文件,帶一個換行符
	 * 四、將報表字符串寫入數據文件,帶一個換行符
	 * 五、把 應用名 + '\t' + offset + '\n' 寫入到索引文件
	 * 五、計算當前的數據文件長度
	 */
	@Override
	public boolean storeById(String id, String report) throws IOException {
		byte[] content = report.getBytes("utf-8");
		int length = content.length;
		byte[] num = String.valueOf(length).getBytes("utf-8");

		m_writeLock.lock();

		try {
			m_writeDataFile.write(num);
			m_writeDataFile.write('\n');
			m_writeDataFile.write(content);
			m_writeDataFile.write('\n');
			m_writeDataFile.flush();

			long offset = m_writeDataFileLength;
			String line = id + '\t' + offset + '\n';
			byte[] data = line.getBytes("utf-8");

			m_writeDataFileLength += num.length + 1 + length + 1;
			m_writeIndexFile.write(data);
			m_writeIndexFile.flush();
			m_idToOffsets.put(id, offset);
			return true;
		} finally {
			m_writeLock.unlock();
		}
	}
相關文章
相關標籤/搜索