計算機程序的思惟邏輯 (61) - 內存映射文件及其應用 - 實現一個簡單的消息隊列

本系列文章經補充和完善,已修訂整理成書《Java編程的邏輯》(馬俊昌著),由機械工業出版社華章分社出版,於2018年1月上市熱銷,讀者好評如潮!各大網店和書店有售,歡迎購買:京東自營連接 html

本節介紹內存映射文件,內存映射文件不是Java引入的概念,而是操做系統提供的一種功能,大部分操做系統都支持。java

咱們先來介紹內存映射文件的基本概念,它是什麼,能解決什麼問題,而後咱們介紹如何在Java中使用,咱們會設計和實現一個簡單的、持久化的、跨程序的消息隊列來演示內存映射文件的應用。數據庫

基本概念

所謂內存映射文件,就是將文件映射到內存,文件對應於內存中的一個字節數組,對文件的操做變爲對這個字節數組的操做,而字節數組的操做直接映射到文件上。這種映射能夠是映射文件所有區域,也能夠是隻映射一部分區域。編程

不過,這種映射是操做系統提供的一種假象,文件通常不會立刻加載到內存,操做系統只是記錄下了這回事,當實際發生讀寫時,纔會按需加載。操做系統通常是按頁加載的,頁能夠理解爲就是一塊,頁的大小與操做系統和硬件相關,典型的配置多是4K, 8K等,當操做系統發現讀寫區域不在內存時,就會加載該區域對應的一個頁到內存。數組

這種按需加載的方式,使得內存映射文件能夠方便處理很是大的文件,內存放不下整個文件也沒關係,操做系統會自動進行處理,將須要的內容讀到內存,將修改的內容保存到硬盤,將再也不使用的內存釋放。微信

在應用程序寫的時候,它寫的是內存中的字節數組,這個內容何時同步到文件上呢?這個時機是不肯定的,由操做系統決定,不過,只要操做系統不崩潰,操做系統會保證同步到文件上,即便映射這個文件的應用程序已經退出了。併發

在通常的文件讀寫中,會有兩次數據拷貝,一次是從硬盤拷貝到操做系統內核,另外一次是從操做系統內核拷貝到用戶態的應用程序。而在內存映射文件中,通常狀況下,只有一次拷貝,且內存分配在操做系統內核,應用程序訪問的就是操做系統的內核內存空間,這顯然要比普通的讀寫效率更高app

內存映射文件的另外一個重要特色是,它能夠被多個不一樣的應用程序共享,多個程序能夠映射同一個文件,映射到同一塊內存區域,一個程序對內存的修改,可讓其餘程序也看到,這使得它特別適合用於不一樣應用程序之間的通訊dom

操做系統自身在加載可執行文件的時候,通常都利用了內存映射文件,好比:spa

  • 按需加載代碼,只有當前運行的代碼在內存,其餘暫時用不到的代碼還在硬盤
  • 同時啓動屢次同一個可執行文件,文件代碼在內存也只有一份
  • 不一樣應用程序共享的動態連接庫代碼在內存也只有一份

內存映射文件也有侷限性,好比,它不太適合處理小文件,它是按頁分配內存的,對於小文件,會浪費空間,另外,映射文件要消耗必定的操做系統資源,初始化比較慢。

簡單總結下,對於通常的文件讀寫不須要使用內存映射文件,但若是處理的是大文件,要求極高的讀寫效率,好比數據庫系統,或者須要在不一樣程序間進行共享和通訊,那就能夠考慮內存映射文件。

理解了內存映射文件的基本概念,接下來,咱們看怎麼在Java中使用它。

用法

映射文件

內存映射文件須要經過FileInputStream/FileOutputStream或RandomAccessFile,它們都有一個方法:

public FileChannel getChannel() 複製代碼

FileChannel有以下方法:

public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException 複製代碼

map方法將當前文件映射到內存,映射的結果就是一個MappedByteBuffer對象,它表明內存中的字節數組,待會咱們再來詳細看它。map有三個參數,mode表示映射模式,positon表示映射的起始位置,size表示長度。

mode有三個取值:

  • MapMode.READ_ONLY:只讀
  • MapMode.READ_WRITE:既讀也寫
  • MapMode.PRIVATE:私有模式,更改不反映到文件,也不被其餘程序看到

這個模式受限於背後的流或RandomAccessFile,好比,對於FileInputStream,或者RandomAccessFile但打開模式是"r",那mode就不能設爲MapMode.READ_WRITE,不然會拋出異常。

若是映射的區域超過了現有文件的範圍,則文件會自動擴展,擴展出的區域字節內容爲0。

映射完成後,文件就能夠關閉了,後續對文件的讀寫能夠經過MappedByteBuffer。

看段代碼,好比以讀寫模式映射文件"abc.dat",代碼能夠爲:

RandomAccessFile file = new RandomAccessFile("abc.dat","rw");
try {
    MappedByteBuffer buf = file.getChannel().map(MapMode.READ_WRITE, 0, file.length());
    //使用buf...
} catch (IOException e) {
    e.printStackTrace();
}finally{
    file.close();
}
複製代碼

MappedByteBuffer

怎麼來使用MappedByteBuffer呢?它是ByteBuffer的子類,而ByteBuffer是Buffer的子類。ByteBuffer和Buffer不僅是給內存映射文件提供的,它們是Java NIO中操做數據的一種方式,用於不少地方,方法也比較多,咱們只介紹一些主要相關的。

ByteBuffer能夠簡單理解爲就是封裝了一個字節數組,這個字節數組的長度是不可變的,在內存映射文件中,這個長度由map方法中的參數size決定。

ByteBuffer有一個基本屬性position,表示當前讀寫位置,這個位置能夠改變,相關方法是:

//獲取當前讀寫位置
public final int position() //修改當前讀寫位置 public final Buffer position(int newPosition) 複製代碼

ByteBuffer中有不少基於當前位置讀寫數據的方法,如:

//從當前位置獲取一個字節
public abstract byte get();
//從當前位置拷貝dst.length長度的字節到dst
public ByteBuffer get(byte[] dst) //從當前位置讀取一個int public abstract int getInt();
//從當前位置讀取一個double
public abstract double getDouble();
//將字節數組src寫入當前位置
public final ByteBuffer put(byte[] src) //將long類型的value寫入當前位置 public abstract ByteBuffer putLong(long value);
複製代碼

這些方法在讀寫後,都會自動增長position。

與這些方法相對應的,還有一組方法,能夠在參數中直接指定position,好比:

//從index處讀取一個int
public abstract int getInt(int index);
//從index處讀取一個double
public abstract double getDouble(int index);
//在index處寫入一個double
public abstract ByteBuffer putDouble(int index, double value);
//在index處寫入一個long
public abstract ByteBuffer putLong(int index, long value);
複製代碼

這些方法在讀寫時,不會改變當前讀寫位置position。

MappedByteBuffer本身還定義了一些方法:

//檢查文件內容是否真實加載到了內存,這個值是一個參考值,不必定精確
public final boolean isLoaded() //儘可能將文件內容加載到內存 public final MappedByteBuffer load() //將對內存的修改強制同步到硬盤上 public final MappedByteBuffer force() 複製代碼

消息隊列

瞭解了內存映射文件的用法,接下來,咱們來看怎麼用它設計和實現一個簡單的消息隊列,咱們稱之爲BasicQueue

功能

BasicQueue是一個先進先出的循環隊列,長度固定,接口主要是出隊和入隊,與以前介紹的容器類的區別是:

  • 消息持久化保存在文件中,重啓程序消息不會丟失
  • 能夠供不一樣的程序進行協做,典型場景是,有兩個不一樣的程序,一個是生產者,另外一個是消費者,生成者只將消息放入隊列,而消費者只從隊列中取消息,兩個程序經過隊列進行協做,這種協做方式更靈活,相互依賴性小,是一種常見的協做方式。

BasicQueue的構造方法是:

public BasicQueue(String path, String queueName) throws IOException 複製代碼

path表示隊列所在的目錄,必須已存在,queueName表示隊列名,BasicQueue會使用以queueName開頭的兩個文件來保存隊列信息,一個後綴是.data,保存實際的消息,另外一個後綴是.meta,保存元數據信息,若是這兩個文件存在,則會使用已有的隊列,不然會創建新隊列。

BasicQueue主要提供兩個方法,出隊和入隊,以下所示:

//入隊
public void enqueue(byte[] data) throws IOException //出隊 public byte[] dequeue() throws IOException 複製代碼

與上節介紹的BasicDB相似,消息格式也是byte數組。BasicQueue的隊列長度是有限的,若是滿了,調用enqueue會拋出異常,消息的最大長度也是有限的,不能超過1020,若是超了,也會拋出異常。若是隊列爲空,dequeue返回null。

用法示例

BasicQueue的典型用法是生產者和消費者之間的協做,咱們來看下簡單的示例代碼。生產者程序向隊列上放消息,每放一條,就隨機休息一下子,代碼爲:

public class Producer {
    public static void main(String[] args) throws InterruptedException {
        try {
            BasicQueue queue = new BasicQueue("./", "task");
            int i = 0;
            Random rnd = new Random();
            while (true) {
                String msg = new String("task " + (i++));
                queue.enqueue(msg.getBytes("UTF-8"));
                System.out.println("produce: " + msg);
                Thread.sleep(rnd.nextInt(1000));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

消費者程序從隊列中取消息,若是隊列爲空,也隨機睡一下子,代碼爲:

public class Consumer {
    public static void main(String[] args) throws InterruptedException {
        try {
            BasicQueue queue = new BasicQueue("./", "task");
            Random rnd = new Random();
            while (true) {
                byte[] bytes = queue.dequeue();
                if (bytes == null) {
                    Thread.sleep(rnd.nextInt(1000));
                    continue;
                }
                System.out.println("consume: " + new String(bytes, "UTF-8"));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

假定這兩個程序的當前目錄同樣,它們會使用一樣的隊列"task"。同時運行這兩個程序,會看到它們的輸出交替出現。

設計

咱們採用以下簡單方式來設計BasicQueue:

  • 使用兩個文件來保存消息隊列,一個爲數據文件,後綴爲.data,一個是元數據文件.meta。
  • 在.data文件中使用固定長度存儲每條信息,長度爲1024,前4個字節爲實際長度,後面是實際內容,每條消息的最大長度不能超過1020。
  • 在.meta文件中保存隊列頭和尾,指向.data文件中的位置,初始都是0,入隊增長尾,出隊增長頭,到結尾時,再從0開始,模擬循環隊列。
  • 爲了區分隊列滿和空的狀態,始終留一個位置不保存數據,當隊列頭和尾同樣的時候表示隊列爲空,當隊列尾的下一個位置是隊列頭的時候表示隊列滿。

基本設計以下圖所示:

爲簡化起見,咱們暫不考慮因爲併發訪問等引發的一致性問題。

實現消息隊列

下面來看BasicQueue的具體實現代碼。

常量定義

BasicQueue中定義了以下常量,名稱和含義以下:

// 隊列最多消息個數,實際個數還會減1
private static final int MAX_MSG_NUM = 1020*1024;
// 消息體最大長度
private static final int MAX_MSG_BODY_SIZE = 1020;
// 每條消息佔用的空間
private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4;
// 隊列消息體數據文件大小
private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE;
// 隊列元數據文件大小 (head + tail)
private static final int META_SIZE = 8;
複製代碼

內部組成

BasicQueue的內部成員主要就是兩個MappedByteBuffer,分別表示數據和元數據:

private MappedByteBuffer dataBuf;
private MappedByteBuffer metaBuf; 
複製代碼

構造方法

BasicQueue的構造方法代碼是:

public BasicQueue(String path, String queueName) throws IOException {
    if (path.endsWith(File.separator)) {
        path += File.separator;
    }
    RandomAccessFile dataFile = null;
    RandomAccessFile metaFile = null;
    try {
        dataFile = new RandomAccessFile(path + queueName + ".data", "rw");
        metaFile = new RandomAccessFile(path + queueName + ".meta", "rw");

        dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0,
                DATA_FILE_SIZE);
        metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0,
                META_SIZE);
    } finally {
        if (dataFile != null) {
            dataFile.close();
        }
        if (metaFile != null) {
            metaFile.close();
        }
    }
}
複製代碼

輔助方法

爲了方便訪問和修改隊列頭尾指針,咱們有以下方法:

private int head() {
    return metaBuf.getInt(0);
}

private void head(int newHead) {
    metaBuf.putInt(0, newHead);
}

private int tail() {
    return metaBuf.getInt(4);
}

private void tail(int newTail) {
    metaBuf.putInt(4, newTail);
}
複製代碼

爲了便於判斷隊列是空仍是滿,咱們有以下方法:

private boolean isEmpty(){
    return head() == tail();
}

private boolean isFull(){
    return ((tail() + MSG_SIZE) % DATA_FILE_SIZE) == head();
}
複製代碼

入隊

代碼爲:

public void enqueue(byte[] data) throws IOException {
    if (data.length > MAX_MSG_BODY_SIZE) {
        throw new IllegalArgumentException("msg size is " + data.length
                + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE);
    }
    if (isFull()) {
        throw new IllegalStateException("queue is full");
    }
    int tail = tail();
    dataBuf.position(tail);
    dataBuf.putInt(data.length);
    dataBuf.put(data);

    if (tail + MSG_SIZE >= DATA_FILE_SIZE) {
        tail(0);
    } else {
        tail(tail + MSG_SIZE);
    }
}
複製代碼

基本邏輯是:

  1. 若是消息太長或隊列滿,拋出異常。
  2. 找到隊列尾,定位到隊列尾,寫消息長度,寫實際數據。
  3. 更新隊列尾指針,若是已到文件尾,再從頭開始。

出隊

代碼爲:

public byte[] dequeue() throws IOException {
    if (isEmpty()) {
        return null;
    }
    int head = head();
    dataBuf.position(head);
    int length = dataBuf.getInt();
    byte[] data = new byte[length];
    dataBuf.get(data);

    if (head + MSG_SIZE >= DATA_FILE_SIZE) {
        head(0);
    } else {
        head(head + MSG_SIZE);
    }
    return data;
}
複製代碼

基本邏輯是:

  1. 若是隊列爲空,返回null。
  2. 找到隊列頭,定位到隊列頭,讀消息長度,讀實際數據。
  3. 更新隊列頭指針,若是已到文件尾,再從頭開始。
  4. 最後返回實際數據

小結

本節介紹了內存映射文件的基本概念及在Java中的的用法,在平常普通的文件讀寫中,咱們用到的比較少,但在一些系統程序中,它倒是常常被用到的一把利器,能夠高效的讀寫大文件,且能實現不一樣程序間的共享和通訊。

利用內存映射文件,咱們設計和實現了一個簡單的消息隊列,消息能夠持久化,能夠實現跨程序的生產者/消費者通訊,咱們演示了這個消息隊列的功能、用法、設計和實現代碼。

前面幾節,咱們屢次提到過序列化的概念,它究竟是什麼呢?


未完待續,查看最新文章,敬請關注微信公衆號「老馬說編程」(掃描下方二維碼),深刻淺出,老馬和你一塊兒探索Java編程及計算機技術的本質。用心原創,保留全部版權。

相關文章
相關標籤/搜索