本節介紹內存映射文件,內存映射文件不是Java引入的概念,而是操作系統提供的一種功能,大部分操作系統都支持。
我們先來介紹內存映射文件的基本概念,它是什麼,能解決什麼問題,然後我們介紹如何在Java中使用,我們會設計和實現一個簡單的、持久化的、跨程序的消息隊列來演示內存映射文件的應用。
基本概念
所謂內存映射文件,就是將文件映射到內存,文件對應於內存中的一個字節數組,對文件的操作變為對這個字節數組的操作,而字節數組的操作直接映射到文件上。這種映射可以是映射文件全部區域,也可以是只映射一部分區域。
不過,這種映射是操作系統提供的一種假象,文件一般不會馬上加載到內存,操作系統只是記錄下了這回事,當實際發生讀寫時,才會按需加載。操作系統一般是按頁加載的,頁可以理解為就是一塊,頁的大小與操作系統和硬件相關,典型的配置可能是4K, 8K等,當操作系統發現讀寫區域不在內存時,就會加載該區域對應的一個頁到內存。
這種按需加載的方式,使得內存映射文件可以方便處理非常大的文件,內存放不下整個文件也不要緊,操作系統會自動進行處理,將需要的內容讀到內存,將修改的內容保存到硬盤,將不再使用的內存釋放。
在應用程序寫的時候,它寫的是內存中的字節數組,這個內容什麼時候同步到文件上呢?這個時機是不確定的,由操作系統決定,不過,只要操作系統不崩潰,操作系統會保證同步到文件上,即使映射這個文件的應用程序已經退出了。
在一般的文件讀寫中,會有兩次數據拷貝,一次是從硬盤拷貝到操作系統內核,另一次是從操作系統內核拷貝到用戶態的應用程序。而在內存映射文件中,一般情況下,只有一次拷貝,且內存分配在操作系統內核,應用程序訪問的就是操作系統的內核內存空間,這顯然要比普通的讀寫效率更高。
內存映射文件的另一個重要特點是,它可以被多個不同的應用程序共享,多個程序可以映射同一個文件,映射到同一塊內存區域,一個程序對內存的修改,可以讓其他程序也看到,這使得它特別適合用於不同應用程序之間的通信。
操作系統自身在加載可執行文件的時候,一般都利用了內存映射文件,比如:
內存映射文件也有局限性,比如,它不太適合處理小文件,它是按頁分配內存的,對於小文件,會浪費空間,另外,映射文件要消耗一定的操作系統資源,初始化比較慢。
簡單總結下,對於一般的文件讀寫不需要使用內存映射文件,但如果處理的是大文件,要求極高的讀寫效率,比如數據庫系統,或者需要在不同程序間進行共享和通信,那就可以考慮內存映射文件。
理解了內存映射文件的基本概念,接下來,我們看怎麼在Java中使用它。
用法
映射文件
內存映射文件需要通過FileInputStream/FileOutputStream或RandomAccessFile,它們都有一個方法:
public FileChannel getChannel()
FileChannel有如下方法:
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
map方法將當前文件映射到內存,映射的結果就是一個MappedByteBuffer對象,它代表內存中的字節數組,待會我們再來詳細看它。map有三個參數,mode表示映射模式,positon表示映射的起始位置,size表示長度。
mode有三個取值:
這個模式受限於背後的流或RandomAccessFile,比如,對於FileInputStream,或者RandomAccessFile但打開模式是"r",那mode就不能設為MapMode.READ_WRITE,否則會拋出異常。
如果映射的區域超過了現有文件的范圍,則文件會自動擴展,擴展出的區域字節內容為0。
映射完成後,文件就可以關閉了,後續對文件的讀寫可以通過MappedByteBuffer。
看段代碼,比如以讀寫模式映射文件"abc.dat",代碼可以為:
RandomAccessFile file = new RandomAccessFile("abc.dat","rw"); try { MappedByteBuffer buf = file.getChannel().map(MapMode.READ_WRITE, 0, file.length()); //使用buf... } catch (IOException e) { e.printStackTrace(); }finally{ file.close(); }
MappedByteBuffer
怎麼來使用MappedByteBuffer呢?它是ByteBuffer的子類,而ByteBuffer是Buffer的子類。ByteBuffer和Buffer不只是給內存映射文件提供的,它們是Java NIO中操作數據的一種方式,用於很多地方,方法也比較多,我們只介紹一些主要相關的。
ByteBuffer可以簡單理解為就是封裝了一個字節數組,這個字節數組的長度是不可變的,在內存映射文件中,這個長度由map方法中的參數size決定。
ByteBuffer有一個基本屬性position,表示當前讀寫位置,這個位置可以改變,相關方法是:
//獲取當前讀寫位置 public final int position() //修改當前讀寫位置 public final Buffer position(int newPosition)
ByteBuffer中有很多基於當前位置讀寫數據的方法,如:
//從當前位置獲取一個字節 public abstract byte get(); //從當前位置拷貝dst.length長度的字節到dst public ByteBuffer get(byte[] dst) //從當前位置讀取一個int public abstract int getInt(); //從當前位置讀取一個double public abstract double getDouble(); //將字節數組src寫入當前位置 public final ByteBuffer put(byte[] src) //將long類型的value寫入當前位置 public abstract ByteBuffer putLong(long value);
這些方法在讀寫後,都會自動增加position。
與這些方法相對應的,還有一組方法,可以在參數中直接指定position,比如:
//從index處讀取一個int public abstract int getInt(int index); //從index處讀取一個double public abstract double getDouble(int index); //在index處寫入一個double public abstract ByteBuffer putDouble(int index, double value); //在index處寫入一個long public abstract ByteBuffer putLong(int index, long value);
這些方法在讀寫時,不會改變當前讀寫位置position。
MappedByteBuffer自己還定義了一些方法:
//檢查文件內容是否真實加載到了內存,這個值是一個參考值,不一定精確 public final boolean isLoaded() //盡量將文件內容加載到內存 public final MappedByteBuffer load() //將對內存的修改強制同步到硬盤上 public final MappedByteBuffer force()
消息隊列
了解了內存映射文件的用法,接下來,我們來看怎麼用它設計和實現一個簡單的消息隊列,我們稱之為BasicQueue。
功能
BasicQueue是一個先進先出的循環隊列,長度固定,接口主要是出隊和入隊,與之前介紹的容器類的區別是:
BasicQueue的構造方法是:
public BasicQueue(String path, String queueName) throws IOException
path表示隊列所在的目錄,必須已存在,queueName表示隊列名,BasicQueue會使用以queueName開頭的兩個文件來保存隊列信息,一個後綴是.data,保存實際的消息,另一個後綴是.meta,保存元數據信息,如果這兩個文件存在,則會使用已有的隊列,否則會建立新隊列。
BasicQueue主要提供兩個方法,出隊和入隊,如下所示:
//入隊 public void enqueue(byte[] data) throws IOException //出隊 public byte[] dequeue() throws IOException
與上節介紹的BasicDB類似,消息格式也是byte數組。BasicQueue的隊列長度是有限的,如果滿了,調用enqueue會拋出異常,消息的最大長度也是有限的,不能超過1020,如果超了,也會拋出異常。如果隊列為空,dequeue返回null。
用法示例
BasicQueue的典型用法是生產者和消費者之間的協作,我們來看下簡單的示例代碼。生產者程序向隊列上放消息,每放一條,就隨機休息一會兒,代碼為:
public class Producer { public static void main(String[] args) throws InterruptedException { try { BasicQueue queue = new BasicQueue("./", "task"); int i = 0; Random rnd = new Random(); while (true) { String msg = new String("task " + (i++)); queue.enqueue(msg.getBytes("UTF-8")); System.out.println("produce: " + msg); Thread.sleep(rnd.nextInt(1000)); } } catch (IOException e) { e.printStackTrace(); } } }
消費者程序從隊列中取消息,如果隊列為空,也隨機睡一會兒,代碼為:
public class Consumer { public static void main(String[] args) throws InterruptedException { try { BasicQueue queue = new BasicQueue("./", "task"); Random rnd = new Random(); while (true) { byte[] bytes = queue.dequeue(); if (bytes == null) { Thread.sleep(rnd.nextInt(1000)); continue; } System.out.println("consume: " + new String(bytes, "UTF-8")); } } catch (IOException e) { e.printStackTrace(); } } }
假定這兩個程序的當前目錄一樣,它們會使用同樣的隊列"task"。同時運行這兩個程序,會看到它們的輸出交替出現。
設計
我們采用如下簡單方式來設計BasicQueue:
基本設計如下圖所示:
為簡化起見,我們暫不考慮由於並發訪問等引起的一致性問題。
實現消息隊列
下面來看BasicQueue的具體實現代碼。
常量定義
BasicQueue中定義了如下常量,名稱和含義如下:
// 隊列最多消息個數,實際個數還會減1 private static final int MAX_MSG_NUM = 1020*1024; // 消息體最大長度 private static final int MAX_MSG_BODY_SIZE = 1020; // 每條消息占用的空間 private static final int MSG_SIZE = MAX_MSG_BODY_SIZE + 4; // 隊列消息體數據文件大小 private static final int DATA_FILE_SIZE = MAX_MSG_NUM * MSG_SIZE; // 隊列元數據文件大小 (head + tail) private static final int META_SIZE = 8;
內部組成
BasicQueue的內部成員主要就是兩個MappedByteBuffer,分別表示數據和元數據:
private MappedByteBuffer dataBuf; private MappedByteBuffer metaBuf;
構造方法
BasicQueue的構造方法代碼是:
public BasicQueue(String path, String queueName) throws IOException { if (path.endsWith(File.separator)) { path += File.separator; } RandomAccessFile dataFile = null; RandomAccessFile metaFile = null; try { dataFile = new RandomAccessFile(path + queueName + ".data", "rw"); metaFile = new RandomAccessFile(path + queueName + ".meta", "rw"); dataBuf = dataFile.getChannel().map(MapMode.READ_WRITE, 0, DATA_FILE_SIZE); metaBuf = metaFile.getChannel().map(MapMode.READ_WRITE, 0, META_SIZE); } finally { if (dataFile != null) { dataFile.close(); } if (metaFile != null) { metaFile.close(); } } }
輔助方法
為了方便訪問和修改隊列頭尾指針,我們有如下方法:
private int head() { return metaBuf.getInt(0); } private void head(int newHead) { metaBuf.putInt(0, newHead); } private int tail() { return metaBuf.getInt(4); } private void tail(int newTail) { metaBuf.putInt(4, newTail); }
為了便於判斷隊列是空還是滿,我們有如下方法:
private boolean isEmpty(){ return head() == tail(); } private boolean isFull(){ return ((tail() + MSG_SIZE) % DATA_FILE_SIZE) == head(); }
入隊
代碼為:
public void enqueue(byte[] data) throws IOException { if (data.length > MAX_MSG_BODY_SIZE) { throw new IllegalArgumentException("msg size is " + data.length + ", while maximum allowed length is " + MAX_MSG_BODY_SIZE); } if (isFull()) { throw new IllegalStateException("queue is full"); } int tail = tail(); dataBuf.position(tail); dataBuf.putInt(data.length); dataBuf.put(data); if (tail + MSG_SIZE >= DATA_FILE_SIZE) { tail(0); } else { tail(tail + MSG_SIZE); } }
基本邏輯是:
出隊
代碼為:
public byte[] dequeue() throws IOException { if (isEmpty()) { return null; } int head = head(); dataBuf.position(head); int length = dataBuf.getInt(); byte[] data = new byte[length]; dataBuf.get(data); if (head + MSG_SIZE >= DATA_FILE_SIZE) { head(0); } else { head(head + MSG_SIZE); } return data; }
基本邏輯是:
小結
本節介紹了內存映射文件的基本概念及在Java中的的用法,在日常普通的文件讀寫中,我們用到的比較少,但在一些系統程序中,它卻是經常被用到的一把利器,可以高效的讀寫大文件,且能實現不同程序間的共享和通信。
利用內存映射文件,我們設計和實現了一個簡單的消息隊列,消息可以持久化,可以實現跨程序的生產者/消費者通信,我們演示了這個消息隊列的功能、用法、設計和實現代碼。
前面幾節,我們多次提到過序列化的概念,它到底是什麼呢?
----------------
未完待續,查看最新文章,敬請關注微信公眾號“老馬說編程”(掃描下方二維碼),從入門到高級,深入淺出,老馬和你一起探索Java編程及計算機技術的本質。用心原創,保留所有版權。