用mappedbytebuffer實現一個持久化隊列【轉】

自從前段時間的一個事故讓隊列裏緩存的大量關鍵數據丟失後,一直琢磨着弄一個能持久化到本地文件的隊列,這樣即便系統再次發生意外,我也不至於再苦逼的修數據了。選定使用mappedbytebuffer來實現,但作出來的原型不夠理想。《高性能隊列Fqueue的設計和使用實踐》這篇文章給了我莫大的幫助。 固然只是借鑑了大體的文件系統結構,絕大部分仍是按本身的想法來的。html

    上圖就是隊列的文件系統,index文件記錄了隊列當前的讀寫文件號,讀寫位置和讀寫計數。隊列的size是經過讀寫計數writeCounter-readCounter的方式記錄的,這樣作的好處是能夠作到讀寫分離。運行時size用一個AtomicInteger變量記錄,系統初始化加載隊列時纔用到讀寫計數差。block文件記錄了實際的入隊數據,每一個block必需要有足夠的空間寫入4(len)+data.length+4(EOF)長度的數據,不然寫入一個EOF標記,換一個新的block開始寫入數據,而當讀取到這個EOF時,表示這個block讀取完畢,載入下一個block,若是有的話,釋放並準備刪除當前block。如今規定的block大小32MB,按個人使用場景,每一個block能夠寫入100W數據(PS:protostuff-runtime挺不錯的,我用它作的序列化)。java

    最後附上代碼:緩存

    public class MFQueuePool {app

    private static final Logger LOGGER = LoggerFactory.getLogger(MFQueuePool.class);     private static final BlockingQueue<String> DELETING_QUEUE = new LinkedBlockingQueue<>();     private static MFQueuePool instance = null;     private String fileBackupPath;     private Map<String, MFQueue> fQueueMap;     private ScheduledExecutorService syncService;     private MFQueuePool(String fileBackupPath) {         this.fileBackupPath = fileBackupPath;         File fileBackupDir = new File(fileBackupPath);         if (!fileBackupDir.exists() && !fileBackupDir.mkdir()) {             throw new IllegalArgumentException("can not create directory");         }         this.fQueueMap = scanDir(fileBackupDir);         this.syncService = Executors.newSingleThreadScheduledExecutor();         this.syncService.scheduleWithFixedDelay(new Runnable() {             @Override             public void run() {                 for (MFQueue MFQueue : fQueueMap.values()) {                     MFQueue.sync();                 }                 deleteBlockFile();             }         }, 10L, 10L, TimeUnit.SECONDS);     }     private void deleteBlockFile() {         String blockFilePath = DELETING_QUEUE.poll();         if (StringUtils.isNotBlank(blockFilePath)) {             File delFile = new File(blockFilePath);             try {                 if (!delFile.delete()) {                     LOGGER.warn("block file:{} delete failed", blockFilePath);                 }             } catch (SecurityException e) {                 LOGGER.error("security manager exists, delete denied");             }         }     }     private static void toClear(String filePath) {         DELETING_QUEUE.add(filePath);     }     private Map<String, MFQueue> scanDir(File fileBackupDir) {         if (!fileBackupDir.isDirectory()) {             throw new IllegalArgumentException("it is not a directory");         }         Map<String, MFQueue> exitsFQueues = new HashMap<>();         File[] indexFiles = fileBackupDir.listFiles(new FilenameFilter() {             @Override             public boolean accept(File dir, String name) {                 return MFQueueIndex.isIndexFile(name);             }         });         if (ArrayUtils.isNotEmpty(indexFiles)) {             for (File indexFile : indexFiles) {                 String queueName = MFQueueIndex.parseQueueName(indexFile.getName());                 exitsFQueues.put(queueName, new MFQueue(queueName, fileBackupPath));             }         }         return exitsFQueues;     }     public synchronized static void init(String deployPath) {         if (instance == null) {             instance = new MFQueuePool(deployPath);         }     }     private void disposal() {         this.syncService.shutdown();         for (MFQueue MFQueue : fQueueMap.values()) {             MFQueue.close();         }         while (!DELETING_QUEUE.isEmpty()) {             deleteBlockFile();         }     }     public synchronized static void destory() {         if (instance != null) {             instance.disposal();             instance = null;         }     }     private MFQueue getQueueFromPool(String queueName) {         if (fQueueMap.containsKey(queueName)) {             return fQueueMap.get(queueName);         }         MFQueue MFQueue = new MFQueue(queueName, fileBackupPath);         fQueueMap.put(queueName, MFQueue);         return MFQueue;     }     public synchronized static MFQueue getFQueue(String queueName) {         if (StringUtils.isBlank(queueName)) {             throw new IllegalArgumentException("empty queue name");         }         return instance.getQueueFromPool(queueName);     }     public static class MFQueue extends AbstractQueue<byte[]> {         private String queueName;         private String fileBackupDir;         private MFQueueIndex index;         private MFQueueBlock readBlock;         private MFQueueBlock writeBlock;         private ReentrantLock readLock;         private ReentrantLock writeLock;         private AtomicInteger size;         public MFQueue(String queueName, String fileBackupDir) {             this.queueName = queueName;             this.fileBackupDir = fileBackupDir;             this.readLock = new ReentrantLock();             this.writeLock = new ReentrantLock();             this.index = new MFQueueIndex(MFQueueIndex.formatIndexFilePath(queueName, fileBackupDir));             this.size = new AtomicInteger(index.getWriteCounter() - index.getReadCounter());             this.writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                     index.getWriteNum(), fileBackupDir));             if (index.getReadNum() == index.getWriteNum()) {                 this.readBlock = this.writeBlock.duplicate();             } else {                 this.readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                         index.getReadNum(), fileBackupDir));             }         }         @Override         public Iterator<byte[]> iterator() {             throw new UnsupportedOperationException();         }         @Override         public int size() {             return this.size.get();         }         private void rotateNextWriteBlock() {             int nextWriteBlockNum = index.getWriteNum() + 1;             nextWriteBlockNum = (nextWriteBlockNum < 0) ? 0 : nextWriteBlockNum;             writeBlock.putEOF();             if (index.getReadNum() == index.getWriteNum()) {                 writeBlock.sync();             } else {                 writeBlock.close();             }             writeBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                     nextWriteBlockNum, fileBackupDir));             index.putWriteNum(nextWriteBlockNum);             index.putWritePosition(0);         }         @Override         public boolean offer(byte[] bytes) {             if (ArrayUtils.isEmpty(bytes)) {                 return true;             }             writeLock.lock();             try {                 if (!writeBlock.isSpaceAvailable(bytes.length)) {                     rotateNextWriteBlock();                 }                 writeBlock.write(bytes);                 size.incrementAndGet();                 return true;             } finally {                 writeLock.unlock();             }         }         private void rotateNextReadBlock() {             if (index.getReadNum() == index.getWriteNum()) {                 // 讀緩存塊的滑動必須發生在寫緩存塊滑動以後                 return;             }             int nextReadBlockNum = index.getReadNum() + 1;             nextReadBlockNum = (nextReadBlockNum < 0) ? 0 : nextReadBlockNum;             readBlock.close();             String blockPath = readBlock.getBlockFilePath();             if (nextReadBlockNum == index.getWriteNum()) {                 readBlock = writeBlock.duplicate();             } else {                 readBlock = new MFQueueBlock(index, MFQueueBlock.formatBlockFilePath(queueName,                         nextReadBlockNum, fileBackupDir));             }             index.putReadNum(nextReadBlockNum);             index.putReadPosition(0);             MFQueuePool.toClear(blockPath);         }         @Override         public byte[] poll() {             readLock.lock();             try {                 if (readBlock.eof()) {                     rotateNextReadBlock();                 }                 byte[] bytes = readBlock.read();                 if (bytes != null) {                     size.decrementAndGet();                 }                 return bytes;             } finally {                 readLock.unlock();             }         }         @Override         public byte[] peek() {             throw new UnsupportedOperationException();         }         public void sync() {             index.sync();             // read block只讀,不用同步             writeBlock.sync();         }         public void close() {             writeBlock.close();             if (index.getReadNum() != index.getWriteNum()) {                 readBlock.close();             }             index.reset();             index.close();         }     }     @SuppressWarnings("UnusedDeclaration")     private static class MFQueueIndex {         private static final String MAGIC = "v.1.0000";         private static final String INDEX_FILE_SUFFIX = ".idx";         private static final int INDEX_SIZE = 32;         private static final int READ_NUM_OFFSET = 8;         private static final int READ_POS_OFFSET = 12;         private static final int READ_CNT_OFFSET = 16;         private static final int WRITE_NUM_OFFSET = 20;         private static final int WRITE_POS_OFFSET = 24;         private static final int WRITE_CNT_OFFSET = 28;         private int p11, p12, p13, p14, p15, p16, p17, p18; // 緩存行填充 32B         private volatile int readPosition;   // 12   讀索引位置         private volatile int readNum;        // 8   讀索引文件號         private volatile int readCounter;    // 16   總讀取數量         private int p21, p22, p23, p24, p25, p26, p27, p28; // 緩存行填充 32B         private volatile int writePosition;  // 24  寫索引位置         private volatile int writeNum;       // 20  寫索引文件號         private volatile int writeCounter;   // 28 總寫入數量         private int p31, p32, p33, p34, p35, p36, p37, p38; // 緩存行填充 32B         private RandomAccessFile indexFile;         private FileChannel fileChannel;         // 讀寫分離         private MappedByteBuffer writeIndex;         private MappedByteBuffer readIndex;         public MFQueueIndex(String indexFilePath) {             File file = new File(indexFilePath);             try {                 if (file.exists()) {                     this.indexFile = new RandomAccessFile(file, "rw");                     byte[] bytes = new byte[8];                     this.indexFile.read(bytes, 0, 8);                     if (!MAGIC.equals(new String(bytes))) {                         throw new IllegalArgumentException("version mismatch");                     }                     this.readNum = indexFile.readInt();                     this.readPosition = indexFile.readInt();                     this.readCounter = indexFile.readInt();                     this.writeNum = indexFile.readInt();                     this.writePosition = indexFile.readInt();                     this.writeCounter = indexFile.readInt();                     this.fileChannel = indexFile.getChannel();                     this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);                     this.writeIndex = writeIndex.load();                     this.readIndex = (MappedByteBuffer) writeIndex.duplicate();                 } else {                     this.indexFile = new RandomAccessFile(file, "rw");                     this.fileChannel = indexFile.getChannel();                     this.writeIndex = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, INDEX_SIZE);                     this.readIndex = (MappedByteBuffer) writeIndex.duplicate();                     putMagic();                     putReadNum(0);                     putReadPosition(0);                     putReadCounter(0);                     putWriteNum(0);                     putWritePosition(0);                     putWriteCounter(0);                 }             } catch (Exception e) {                 throw new IllegalArgumentException(e);             }         }         public static boolean isIndexFile(String fileName) {             return fileName.endsWith(INDEX_FILE_SUFFIX);         }         public static String parseQueueName(String indexFileName) {             String fileName = indexFileName.substring(0, indexFileName.lastIndexOf('.'));             return fileName.split("_")[1];         }         public static String formatIndexFilePath(String queueName, String fileBackupDir) {             return fileBackupDir + File.separator + String.format("findex_%s%s", queueName, INDEX_FILE_SUFFIX);         }         public int getReadNum() {             return this.readNum;         }         public int getReadPosition() {             return this.readPosition;         }         public int getReadCounter() {             return this.readCounter;         }         public int getWriteNum() {             return this.writeNum;         }         public int getWritePosition() {             return this.writePosition;         }         public int getWriteCounter() {             return this.writeCounter;         }         public void putMagic() {             this.writeIndex.position(0);             this.writeIndex.put(MAGIC.getBytes());         }         public void putWritePosition(int writePosition) {             this.writeIndex.position(WRITE_POS_OFFSET);             this.writeIndex.putInt(writePosition);             this.writePosition = writePosition;         }         public void putWriteNum(int writeNum) {             this.writeIndex.position(WRITE_NUM_OFFSET);             this.writeIndex.putInt(writeNum);             this.writeNum = writeNum;         }         public void putWriteCounter(int writeCounter) {             this.writeIndex.position(WRITE_CNT_OFFSET);             this.writeIndex.putInt(writeCounter);             this.writeCounter = writeCounter;         }         public void putReadNum(int readNum) {             this.readIndex.position(READ_NUM_OFFSET);             this.readIndex.putInt(readNum);             this.readNum = readNum;         }         public void putReadPosition(int readPosition) {             this.readIndex.position(READ_POS_OFFSET);             this.readIndex.putInt(readPosition);             this.readPosition = readPosition;         }         public void putReadCounter(int readCounter) {             this.readIndex.position(READ_CNT_OFFSET);             this.readIndex.putInt(readCounter);             this.readCounter = readCounter;         }         public void reset() {             int size = writeCounter - readCounter;             putReadCounter(0);             putWriteCounter(size);             if (size == 0 && readNum == writeNum) {                 putReadPosition(0);                 putWritePosition(0);             }         }         public void sync() {             if (writeIndex != null) {                 writeIndex.force();             }         }         public void close() {             try {                 if (writeIndex == null) {                     return;                 }                 sync();                 AccessController.doPrivileged(new PrivilegedAction<Object>() {                     public Object run() {                         try {                             Method getCleanerMethod = writeIndex.getClass().getMethod("cleaner");                             getCleanerMethod.setAccessible(true);                             sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(writeIndex);                             cleaner.clean();                         } catch (Exception e) {                             LOGGER.error("close fqueue index file failed", e);                         }                         return null;                     }                 });                 writeIndex = null;                 readIndex = null;                 fileChannel.close();                 indexFile.close();             } catch (IOException e) {                 LOGGER.error("close fqueue index file failed", e);             }         }     }     private static class MFQueueBlock {         private static final String BLOCK_FILE_SUFFIX = ".blk"; // 數據文件         private static final int BLOCK_SIZE = 32 * 1024 * 1024; // 32MB         private final int EOF = -1;         private String blockFilePath;         private MFQueueIndex index;         private RandomAccessFile blockFile;         private FileChannel fileChannel;         private ByteBuffer byteBuffer;         private MappedByteBuffer mappedBlock;         public MFQueueBlock(String blockFilePath, MFQueueIndex index, RandomAccessFile blockFile, FileChannel fileChannel,                             ByteBuffer byteBuffer, MappedByteBuffer mappedBlock) {             this.blockFilePath = blockFilePath;             this.index = index;             this.blockFile = blockFile;             this.fileChannel = fileChannel;             this.byteBuffer = byteBuffer;             this.mappedBlock = mappedBlock;         }         public MFQueueBlock(MFQueueIndex index, String blockFilePath) {             this.index = index;             this.blockFilePath = blockFilePath;             try {                 File file = new File(blockFilePath);                 this.blockFile = new RandomAccessFile(file, "rw");                 this.fileChannel = blockFile.getChannel();                 this.mappedBlock = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, BLOCK_SIZE);                 this.byteBuffer = mappedBlock.load();             } catch (Exception e) {                 throw new IllegalArgumentException(e);             }         }         public MFQueueBlock duplicate() {             return new MFQueueBlock(this.blockFilePath, this.index, this.blockFile, this.fileChannel,                     this.byteBuffer.duplicate(), this.mappedBlock);         }         public static String formatBlockFilePath(String queueName, int fileNum, String fileBackupDir) {             return fileBackupDir + File.separator + String.format("fblock_%s_%d%s", queueName, fileNum, BLOCK_FILE_SUFFIX);         }         public String getBlockFilePath() {             return blockFilePath;         }         public void putEOF() {             this.byteBuffer.position(index.getWritePosition());             this.byteBuffer.putInt(EOF);         }         public boolean isSpaceAvailable(int len) {             int increment = len + 4;             int writePosition = index.getWritePosition();             return BLOCK_SIZE >= increment + writePosition + 4; // 保證最後有4字節的空間能夠寫入EOF         }         public boolean eof() {             int readPosition = index.getReadPosition();             return readPosition > 0 && byteBuffer.getInt(readPosition) == EOF;         }         public int write(byte[] bytes) {             int len = bytes.length;             int increment = len + 4;             int writePosition = index.getWritePosition();             byteBuffer.position(writePosition);             byteBuffer.putInt(len);             byteBuffer.put(bytes);             index.putWritePosition(increment + writePosition);             index.putWriteCounter(index.getWriteCounter() + 1);             return increment;         }         public byte[] read() {             byte[] bytes;             int readNum = index.getReadNum();             int readPosition = index.getReadPosition();             int writeNum = index.getWriteNum();             int writePosition = index.getWritePosition();             if (readNum == writeNum && readPosition >= writePosition) {                 return null;             }             byteBuffer.position(readPosition);             int dataLength = byteBuffer.getInt();             if (dataLength <= 0) {                 return null;             }             bytes = new byte[dataLength];             byteBuffer.get(bytes);             index.putReadPosition(readPosition + bytes.length + 4);             index.putReadCounter(index.getReadCounter() + 1);             return bytes;         }         public void sync() {             if (mappedBlock != null) {                 mappedBlock.force();             }         }         public void close() {             try {                 if (mappedBlock == null) {                     return;                 }                 sync();                 AccessController.doPrivileged(new PrivilegedAction<Object>() {                     public Object run() {                         try {                             Method getCleanerMethod = mappedBlock.getClass().getMethod("cleaner");                             getCleanerMethod.setAccessible(true);                             sun.misc.Cleaner cleaner = (sun.misc.Cleaner) getCleanerMethod.invoke(mappedBlock);                             cleaner.clean();                         } catch (Exception e) {                             LOGGER.error("close fqueue block file failed", e);                         }                         return null;                     }                 });                 mappedBlock = null;                 byteBuffer = null;                 fileChannel.close();                 blockFile.close();             } catch (IOException e) {                 LOGGER.error("close fqueue block file failed", e);             }         }     }}
相關文章
相關標籤/搜索