自從前段時間的一個事故讓隊列裏緩存的大量關鍵數據丟失後,一直琢磨着弄一個能持久化到本地文件的隊列,這樣即便系統再次發生意外,我也不至於再苦逼的修數據了。選定使用mappedbytebuffer來實現,但作出來的原型不夠理想。《高性能隊列Fqueue的設計和使用實踐》這篇文章給了我莫大的幫助。 固然只是借鑑了大體的文件系統結構,絕大部分仍是按本身的想法來的。html
上圖就是隊列的文件系統,index文件記錄了隊列當前的讀寫文件號,讀寫位置和讀寫計數。隊列的size是經過讀寫計數writeCounter-readCounter的方式記錄的,這樣作的好處是能夠作到讀寫分離。運行時size用一個AtomicInteger變量記錄,系統初始化加載隊列時纔用到讀寫計數差。block文件記錄了實際的入隊數據,每一個block必需要有足夠的空間寫入4(len)+data.length+4(EOF)長度的數據,不然寫入一個EOF標記,換一個新的block開始寫入數據,而當讀取到這個EOF時,表示這個block讀取完畢,載入下一個block,若是有的話,釋放並準備刪除當前block。如今規定的block大小32MB,按個人使用場景,每一個block能夠寫入100W數據(PS:protostuff-runtime挺不錯的,我用它作的序列化)。java
最後附上代碼:緩存
public class MFQueuePool {
app
private static final Logger LOGGER = LoggerFactory.getLogger(MFQueuePool.class); private static final BlockingQueue<String> DELETING_QUEUE = new LinkedBlockingQueue<>(); private static MFQueuePool instance = null; private String fileBackupPath; private Map<String, MFQueue> fQueueMap; private ScheduledExecutorService syncService; private MFQueuePool(String fileBackupPath) { this.fileBackupPath = fileBackupPath; File fileBackupDir = new File(fileBackupPath); if (!fileBackupDir.exists() && !fileBackupDir.mkdir()) { throw new IllegalArgumentException("can not create directory"); } this.fQueueMap = scanDir(fileBackupDir); this.syncService = Executors.newSingleThreadScheduledExecutor(); this.syncService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { for (MFQueue MFQueue : fQueueMap.values()) { MFQueue.sync(); } deleteBlockFile(); } }, 10L, 10L, TimeUnit.SECONDS); } private void deleteBlockFile() { String blockFilePath = DELETING_QUEUE.poll(); if (StringUtils.isNotBlank(blockFilePath)) { File delFile = new File(blockFilePath); try { if (!delFile.delete()) { LOGGER.warn("block file:{} delete failed", blockFilePath); } } catch (SecurityException e) { LOGGER.error("security manager exists, delete denied"); } } } private static void toClear(String filePath) { DELETING_QUEUE.add(filePath); } private Map<String, MFQueue> scanDir(File fileBackupDir) { if (!fileBackupDir.isDirectory()) { throw new IllegalArgumentException("it is not a directory"); } Map<String, MFQueue> exitsFQueues = new HashMap<>(); File[] indexFiles = fileBackupDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return MFQueueIndex.isIndexFile(name); } }); if (ArrayUtils.isNotEmpty(indexFiles)) { for (File indexFile : indexFiles) { String queueName = MFQueueIndex.parseQueueName(indexFile.getName()); exitsFQueues.put(queueName, new MFQueue(queueName, fileBackupPath)); } } return exitsFQueues; } public synchronized static void init(String deployPath) { if (instance == null) { instance = new MFQueuePool(deployPath); } } private void disposal() { this.syncService.shutdown(); for (MFQueue MFQueue : fQueueMap.values()) { MFQueue.close(); } while (!DELETING_QUEUE.isEmpty()) { deleteBlockFile(); } } public synchronized static void destory() { if (instance != null) { instance.disposal(); instance = null; } } private MFQueue getQueueFromPool(String queueName) { if (fQueueMap.containsKey(queueName)) { return fQueueMap.get(queueName); } MFQueue MFQueue = new MFQueue(queueName, fileBackupPath); fQueueMap.put(queueName, MFQueue); return MFQueue; } public synchronized static MFQueue getFQueue(String queueName) { if (StringUtils.isBlank(queueName)) { throw new IllegalArgumentException("empty queue name"); } return instance.getQueueFromPool(queueName); } public static class MFQueue extends AbstractQueue<byte[]> { private String queueName; private String fileBackupDir; private MFQueueIndex index; private MFQueueBlock readBlock; private MFQueueBlock writeBlock; private ReentrantLock readLock; private ReentrantLock writeLock; private AtomicInteger size; public MFQueue(String queueName, String fileBackupDir) { this.queueName = queueName; this.fileBackupDir = fileBackupDir; this.readLock = new ReentrantLock(); this.writeLock = new ReentrantLock(); this.index = new MFQueueIndex(MFQueueIndex.formatIndexFilePath(queueName, fileBackupDir)); this.size = new AtomicInteger(index.getWriteCounter() - index.getReadCounter()); this.writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, index.getWriteNum(), fileBackupDir)); if (index.getReadNum() == index.getWriteNum()) { this.readBlock = this.writeBlock.duplicate(); } else { this.readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, index.getReadNum(), fileBackupDir)); } } @Override public Iterator<byte[]> iterator() { throw new UnsupportedOperationException(); } @Override public int size() { return this.size.get(); } private void rotateNextWriteBlock() { int nextWriteBlockNum = index.getWriteNum() + 1; nextWriteBlockNum = (nextWriteBlockNum < 0) ? 0 : nextWriteBlockNum; writeBlock.putEOF(); if (index.getReadNum() == index.getWriteNum()) { writeBlock.sync(); } else { writeBlock.close(); } writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, nextWriteBlockNum, fileBackupDir)); index.putWriteNum(nextWriteBlockNum); index.putWritePosition(0); } @Override public boolean offer(byte[] bytes) { if (ArrayUtils.isEmpty(bytes)) { return true; } writeLock.lock(); try { if (!writeBlock.isSpaceAvailable(bytes.length)) { rotateNextWriteBlock(); } writeBlock.write(bytes); size.incrementAndGet(); return true; } finally { writeLock.unlock(); } } private void rotateNextReadBlock() { if (index.getReadNum() == index.getWriteNum()) { // 讀緩存塊的滑動必須發生在寫緩存塊滑動以後 return; } int nextReadBlockNum = index.getReadNum() + 1; nextReadBlockNum = (nextReadBlockNum < 0) ? 0 : nextReadBlockNum; readBlock.close(); String blockPath = readBlock.getBlockFilePath(); if (nextReadBlockNum == index.getWriteNum()) { readBlock = writeBlock.duplicate(); } else { readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName, nextReadBlockNum, fileBackupDir)); } index.putReadNum(nextReadBlockNum); index.putReadPosition(0); MFQueuePool.toClear(blockPath); } @Override public byte[] poll() { readLock.lock(); try { if (readBlock.eof()) { rotateNextReadBlock(); } byte[] bytes = readBlock.read(); if (bytes != null) { size.decrementAndGet(); } return bytes; } finally { readLock.unlock(); } } @Override public byte[] peek() { throw new UnsupportedOperationException(); } public void sync() { index.sync(); // read block只讀,不用同步 writeBlock.sync(); } public void close() { writeBlock.close(); if (index.getReadNum() != index.getWriteNum()) { readBlock.close(); } index.reset(); index.close(); } } @SuppressWarnings("UnusedDeclaration") private static class MFQueueIndex { private static final String MAGIC = "v.1.0000"; private static final String INDEX_FILE_SUFFIX = ".idx"; private static final int INDEX_SIZE = 32; private static final int READ_NUM_OFFSET = 8; private static final int READ_POS_OFFSET = 12; private static final int READ_CNT_OFFSET = 16; private static final int WRITE_NUM_OFFSET = 20; private static final int WRITE_POS_OFFSET = 24; private static final int WRITE_CNT_OFFSET = 28; private int p11, p12, p13, p14, p15, p16, p17, p18; // 緩存行填充 32B private volatile int readPosition; // 12 讀索引位置 private volatile int readNum; // 8 讀索引文件號 private volatile int readCounter; // 16 總讀取數量 private int p21, p22, p23, p24, p25, p26, p27, p28; // 緩存行填充 32B private volatile int writePosition; // 24 寫索引位置 private volatile int writeNum; // 20 寫索引文件號 private volatile int writeCounter; // 28 總寫入數量 private int p31, p32, p33, p34, p35, p36, p37, p38; // 緩存行填充 32B private RandomAccessFile indexFile; private FileChannel fileChannel; // 讀寫分離 private MappedByteBuffer writeIndex; private MappedByteBuffer readIndex; public MFQueueIndex(String indexFilePath) { File file = new File(indexFilePath); try { if (file.exists()) { this.indexFile = new RandomAccessFile(file, "rw"); byte[] bytes = new byte[8]; this.indexFile.read(bytes, 0, 8); if (!MAGIC.equals(new String(bytes))) { throw new IllegalArgumentException("version mismatch"); } this.readNum = indexFile.readInt(); this.readPosition = indexFile.readInt(); this.readCounter = indexFile.readInt(); this.writeNum = indexFile.readInt(); this.writePosition = indexFile.readInt(); this.writeCounter = indexFile.readInt(); this.fileChannel = indexFile.getChannel(); this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE); this.writeIndex = writeIndex.load(); this.readIndex = (MappedByteBuffer) writeIndex.duplicate(); } else { this.indexFile = new RandomAccessFile(file, "rw"); this.fileChannel = indexFile.getChannel(); this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE); this.readIndex = (MappedByteBuffer) writeIndex.duplicate(); putMagic(); putReadNum(0); putReadPosition(0); putReadCounter(0); putWriteNum(0); putWritePosition(0); putWriteCounter(0); } } catch (Exception e) { throw new IllegalArgumentException(e); } } public static boolean isIndexFile(String fileName) { return fileName.endsWith(INDEX_FILE_SUFFIX); } public static String parseQueueName(String indexFileName) { String fileName = indexFileName.substring(0, indexFileName.lastIndexOf('.')); return fileName.split("_")[1]; } public static String formatIndexFilePath(String queueName, String fileBackupDir) { return fileBackupDir + File.separator + String.format("findex_%s%s", queueName, INDEX_FILE_SUFFIX); } public int getReadNum() { return this.readNum; } public int getReadPosition() { return this.readPosition; } public int getReadCounter() { return this.readCounter; } public int getWriteNum() { return this.writeNum; } public int getWritePosition() { return this.writePosition; } public int getWriteCounter() { return this.writeCounter; } public void putMagic() { this.writeIndex.position(0); this.writeIndex.put(MAGIC.getBytes()); } public void putWritePosition(int writePosition) { this.writeIndex.position(WRITE_POS_OFFSET); this.writeIndex.putInt(writePosition); this.writePosition = writePosition; } public void putWriteNum(int writeNum) { this.writeIndex.position(WRITE_NUM_OFFSET); this.writeIndex.putInt(writeNum); this.writeNum = writeNum; } public void putWriteCounter(int writeCounter) { this.writeIndex.position(WRITE_CNT_OFFSET); this.writeIndex.putInt(writeCounter); this.writeCounter = writeCounter; } public void putReadNum(int readNum) { this.readIndex.position(READ_NUM_OFFSET); this.readIndex.putInt(readNum); this.readNum = readNum; } public void putReadPosition(int readPosition) { this.readIndex.position(READ_POS_OFFSET); this.readIndex.putInt(readPosition); this.readPosition = readPosition; } public void putReadCounter(int readCounter) { this.readIndex.position(READ_CNT_OFFSET); this.readIndex.putInt(readCounter); this.readCounter = readCounter; } public void reset() { int size = writeCounter - readCounter; putReadCounter(0); putWriteCounter(size); if (size == 0 && readNum == writeNum) { putReadPosition(0); putWritePosition(0); } } public void sync() { if (writeIndex != null) { writeIndex.force(); } } public void close() { try { if (writeIndex == null) { return; } sync(); AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Method getCleanerMethod = writeIndex.getClass().getMethod("cleaner"); getCleanerMethod.setAccessible(true); sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(writeIndex); cleaner.clean(); } catch (Exception e) { LOGGER.error("close fqueue index file failed", e); } return null; } }); writeIndex = null; readIndex = null; fileChannel.close(); indexFile.close(); } catch (IOException e) { LOGGER.error("close fqueue index file failed", e); } } } private static class MFQueueBlock { private static final String BLOCK_FILE_SUFFIX = ".blk"; // 數據文件 private static final int BLOCK_SIZE = 32 * 1024 * 1024; // 32MB private final int EOF = -1; private String blockFilePath; private MFQueueIndex index; private RandomAccessFile blockFile; private FileChannel fileChannel; private ByteBuffer byteBuffer; private MappedByteBuffer mappedBlock; public MFQueueBlock(String blockFilePath, MFQueueIndex index, RandomAccessFile blockFile, FileChannel fileChannel, ByteBuffer byteBuffer, MappedByteBuffer mappedBlock) { this.blockFilePath = blockFilePath; this.index = index; this.blockFile = blockFile; this.fileChannel = fileChannel; this.byteBuffer = byteBuffer; this.mappedBlock = mappedBlock; } public MFQueueBlock(MFQueueIndex index, String blockFilePath) { this.index = index; this.blockFilePath = blockFilePath; try { File file = new File(blockFilePath); this.blockFile = new RandomAccessFile(file, "rw"); this.fileChannel = blockFile.getChannel(); this.mappedBlock = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, BLOCK_SIZE); this.byteBuffer = mappedBlock.load(); } catch (Exception e) { throw new IllegalArgumentException(e); } } public MFQueueBlock duplicate() { return new MFQueueBlock(this.blockFilePath, this.index, this.blockFile, this.fileChannel, this.byteBuffer.duplicate(), this.mappedBlock); } public static String formatBlockFilePath(String queueName, int fileNum, String fileBackupDir) { return fileBackupDir + File.separator + String.format("fblock_%s_%d%s", queueName, fileNum, BLOCK_FILE_SUFFIX); } public String getBlockFilePath() { return blockFilePath; } public void putEOF() { this.byteBuffer.position(index.getWritePosition()); this.byteBuffer.putInt(EOF); } public boolean isSpaceAvailable(int len) { int increment = len + 4; int writePosition = index.getWritePosition(); return BLOCK_SIZE >= increment + writePosition + 4; // 保證最後有4字節的空間能夠寫入EOF } public boolean eof() { int readPosition = index.getReadPosition(); return readPosition > 0 && byteBuffer.getInt(readPosition) == EOF; } public int write(byte[] bytes) { int len = bytes.length; int increment = len + 4; int writePosition = index.getWritePosition(); byteBuffer.position(writePosition); byteBuffer.putInt(len); byteBuffer.put(bytes); index.putWritePosition(increment + writePosition); index.putWriteCounter(index.getWriteCounter() + 1); return increment; } public byte[] read() { byte[] bytes; int readNum = index.getReadNum(); int readPosition = index.getReadPosition(); int writeNum = index.getWriteNum(); int writePosition = index.getWritePosition(); if (readNum == writeNum && readPosition >= writePosition) { return null; } byteBuffer.position(readPosition); int dataLength = byteBuffer.getInt(); if (dataLength <= 0) { return null; } bytes = new byte[dataLength]; byteBuffer.get(bytes); index.putReadPosition(readPosition + bytes.length + 4); index.putReadCounter(index.getReadCounter() + 1); return bytes; } public void sync() { if (mappedBlock != null) { mappedBlock.force(); } } public void close() { try { if (mappedBlock == null) { return; } sync(); AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Method getCleanerMethod = mappedBlock.getClass().getMethod("cleaner"); getCleanerMethod.setAccessible(true); sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedBlock); cleaner.clean(); } catch (Exception e) { LOGGER.error("close fqueue block file failed", e); } return null; } }); mappedBlock = null; byteBuffer = null; fileChannel.close(); blockFile.close(); } catch (IOException e) { LOGGER.error("close fqueue block file failed", e); } } }}