此係列文章會詳細解讀NIO的功能逐步豐滿的路程,爲Reactor-Netty 庫的講解鋪平道路。html
關於Java編程方法論-Reactor與Webflux的視頻分享,已經完成了Rxjava 與 Reactor,b站地址以下:java
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…node
Reactor源碼解讀與分享:www.bilibili.com/video/av353…linux
本系列相關視頻分享: www.bilibili.com/video/av432…c++
本系列源碼解讀基於JDK11 api細節可能與其餘版本有所差異,請自行解決jdk版本問題。git
本系列前幾篇:github
BIO到NIO源碼的一些事兒之NIO 中apache
BIO到NIO源碼的一些事兒之NIO 下 之 Selector
BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上
在瞭解NIO
中DirectByteBuffer
操做Buffer
以前,咱們有必要了解操做系統的相關知識,這樣也就能理解程序爲何要這麼設計實現了。
這裏,咱們以Linux操做系統爲例,首先來看一張圖:
圖1
如上圖所示,從宏觀上看,Linux
操做系統的架構體系分爲內核態和用戶態。操做系統本質上是運行在硬件資源上的軟件。而硬件資源,拿Intel x86
架構CPU
來講,在CPU
的全部指令中,有一些指令是很是危險的,若是用錯,將致使整個系統崩潰。如:清理內存、設置時鐘等。若是全部的程序都能使用這些指令,那麼你的系統一天到晚也就在崩潰中度過了。因此,CPU
將指令分爲特權指令和非特權指令,對於那些危險的指令,只容許操做系統及其相關模塊使用,普通的應用程序只能使用那些不會形成災難的指令。由此,Intel
的CPU
將特權級別分爲4
個級別:RING0
、RING1
、RING2
、RING3
。
對應操做系統,RING0
實際就是內核態,擁有最高權限。而通常應用程序處於RING3
狀態--用戶態。在權限約束上,使用的是高特權等級狀態能夠讀低等級狀態的數據,例如進程上下文、代碼、數據等等,可是反之則不可。即RING0
最高能夠讀取RING0-3
全部的內容,RING1
能夠讀RING1-3
的,RING2
以此類推,RING3
只能讀本身的數據。也就是Ring3
狀態不能訪問Ring0
的地址空間,包括代碼和數據。
咱們知道,在32
位機器上Linux
操做系統中的進程的地址空間大小是4G
,其中0-3G
對應用戶空間,3G-4G
對應內核空間。假如咱們物理機的內存只有2G大小呢?因此,這個4G的地址空間其實就是咱們所說的虛擬地址內存空間(因此,當在32位操做系統下,如windows
,咱們會遇到在物理內存大於8
個G
的狀況下,只識別4
個G
內存)。
那虛擬地址內存空間是什麼呢,它與實際物理內存空間又是怎樣對應的呢?
進程使用虛擬地址內存中的地址,由操做系統協助相關硬件,把它「轉換」成真正的物理地址。虛擬地址經過頁表(Page Table
)映射到物理內存,頁表由操做系統維護並被處理器引用。內核空間在頁表中擁有最高特權級,所以用戶態程序試圖訪問這些頁時會致使一個頁錯誤(page fault
)。在Linux
中,內核空間是持續存在的,而且在全部進程中都映射到一樣的物理內存。內核代碼和數據老是可尋址,隨時準備處理中斷和系統調用。與此相反,用戶模式地址空間的映射隨進程切換的發生而不斷變化。
Linux進程在虛擬內存中的標準內存段佈局以下圖所示:
圖2
注意這裏是32位內核地址空間劃分,64位內核地址空間劃分是不一樣的。
由上圖,咱們從左側Kernel Space
能夠看到在x86
結構中,內核空間分三種類型的區域:
ZONE_DMA 內存開始的16MB
ZONE_NORMAL 16MB~896MB
ZONE_HIGHMEM 896MB ~ 結束
當內核模塊代碼或進程訪問內存時,代碼中所指向的內存地址都爲邏輯地址,而對應到真正的物理內存地址,須要地址一對一的映射,如邏輯地址0xc0000003
對應的物理地址爲0×3
,0xc0000004
對應的物理地址爲0×4
,… …,邏輯地址與物理地址對應的關係爲:
物理地址 = 邏輯地址 – 0xC0000000
邏輯地址 | 物理內存地址 |
---|---|
0xc0000000 | 0×0 |
0xc0000001 | 0×1 |
0xc0000002 | 0×2 |
0xc0000003 | 0×3 |
… | … |
0xe0000000 | 0×20000000 |
… | … |
0xffffffff | **0×40000000 ** |
假 設按照上述簡單的地址映射關係,那麼由圖2
可知,內核邏輯地址空間訪問爲0xc0000000 ~ 0xffffffff
,那麼對應的物理內存範圍就爲0×0 ~ 0×40000000
,即只能訪問1G
物理內存。若機器中安裝8G
物理內存,那麼內核就只能訪問前1G
物理內存,後面7G
物理內存將會沒法訪問,由於內核 的地址空間已經所有映射到物理內存地址範圍0×0 ~ 0×40000000
。即便安裝了8G物理內存,那麼物理地址爲0×40000001
的內存,內核該怎麼去訪問呢?代碼中必需要有內存邏輯地址 的,0xc0000000 ~ 0xffffffff
的地址空間已經被用完了,因此沒法訪問物理地址0×40000000
之後的內存。
顯 然不能將內核地址空間0xc0000000 ~ 0xfffffff
所有用來簡單的地址映射。所以x86架構中將內核地址空間劃分三部分:ZONE_DMA
、ZONE_NORMAL
和 ZONE_HIGHMEM
。ZONE_HIGHMEM
即爲高位內存,這就是高位內存概念的由來。
那麼如內核是如何經過藉助128MB高位內存地址空間達到能夠訪問全部物理內存的目的?
當內核想訪問高於896MB
物理地址內存時,從0xF8000000 ~ 0xFFFFFFFF
地址空間範圍內找一段相應大小空閒的邏輯地址空間,借用一會。借用這段邏輯地址空間,創建映射到想訪問的那段物理內存(即填充內核PTE
頁面表),臨時用一會,用完後歸還。這樣別人也能夠借用這段地址空間訪問其餘物理內存,實現了使用有限的地址空間,訪問全部全部物理內存。
例如內核想訪問2G
開始的一段大小爲1MB
的物理內存,即物理地址範圍爲0×80000000 ~ 0x800FFFFF
。訪問以前先找到一段1MB
大小的空閒地址空間,假設找到的空閒地址空間爲0xF8700000 ~ 0xF87FFFFF
,用這1MB
的邏輯地址空間映射到物理地址空間0×80000000 ~ 0x800FFFFF
的內存。映射關係以下:
邏輯地址 | 物理內存地址 |
---|---|
0xF8700000 | 0×80000000 |
0xF8700001 | 0×80000001 |
0xF8700002 | 0×80000002 |
… | … |
0xF87FFFFF | 0x800FFFFF |
當內核訪問完0×80000000 ~ 0x800FFFFF
物理內存後,就將0xF8700000 ~ 0xF87FFFFF
內核線性空間釋放。這樣其餘進程或代碼也可使用0xF8700000 ~ 0xF87FFFFF
這段地址訪問其餘物理內存。
從上面的描述,咱們能夠知道高位內存的最基本思想:借一段地址空間,創建臨時地址映射,用完後釋放,達到這段地址空間能夠循環使用,訪問全部物理內存。
看到這裏,不由有人會問:萬一有內核進程或模塊一直佔用某段邏輯地址空間不釋放,怎麼辦?若真的出現的這種狀況,則內核的高位內存地址空間愈來愈緊張,若都被佔用不釋放,則沒有可創建映射到物理內存的高位地址空間,也就沒法訪問對應的物理內存了。
簡單的說,進程在使用內存的時候,都不是直接訪問內存物理地址的,進程訪問的都是虛擬內存地址,而後虛擬內存地址再轉化爲內存物理地址。 進程看到的全部地址組成的空間,就是虛擬空間。虛擬空間是某個進程對分配給它的全部物理地址(已經分配的和將會分配的)的從新映射。
這裏能夠認爲虛擬空間都被映射到了硬盤空間中,而且由頁表記錄映射位置,當訪問到某個地址的時候,經過頁表中的有效位,能夠得知此數據是否在內存中,若是不是,則經過缺頁異常,將硬盤對應的數據拷貝到內存中,若是沒有空閒內存,則選擇犧牲頁面,替換其餘頁面(即覆蓋老頁面)。
此處想進一步深刻可參考linux 進程的虛擬內存
咱們回到內核態與用戶態這兩個概念。操做系統的內核態是用來控制計算機的硬件資源,並提供上層應用程序運行的環境。用戶態即上層應用程序的活動空間,應用程序的執行必須依託於內核提供的資源,包括CPU
資源、存儲資源、I/O
資源等。爲了使上層應用可以訪問到這些資源,內核必須爲上層應用提供訪問的接口:即系統調用。
系統調用是操做系統的最小功能單位,這些系統調用根據不一樣的應用場景能夠進行擴展和裁剪,如今各類版本的Unix
實現都提供了不一樣數量的系統調用,如Linux的不一樣版本提供了240-260
個系統調用,FreeBSD
大約提供了320
個。咱們能夠把系統調用當作是一種不能再化簡的操做(相似於原子操做,可是不一樣概念),有人把它比做一個漢字的一個「筆畫」,而一個「漢字」就表明一個上層應用。
用戶空間的應用程序,經過系統調用,進入內核空間。這個時候用戶空間的進程要傳遞不少變量、參數的值給內核,內核態運行的時候也要保存用戶進程的一些寄存器值、變量等。所謂的「進程上下文」,能夠看做是用戶進程傳遞給內核的這些參數以及內核要保存的那一整套的變量和寄存器值和當時的環境等。
#### 系統IO調用
那咱們來看一下通常的IO調用。在傳統的文件IO操做中,都是調用操做系統提供的底層標準IO系統調用函數 read()、write() ,此時調用此函數的進程(在JAVA中即java進程)由當前的用戶態切換到內核態,而後OS的內核代碼負責將相應的文件數據讀取到內核的IO緩衝區,而後再把數據從內核IO緩衝區拷貝到進程的私有地址空間中去,這樣便完成了一次IO操做。以下圖所示。
圖3
此處,咱們經過一個Demo來捋一下這個過程:
byte[] b = new byte[1024];
while((read = inputStream.read(b))>=0) {
total = total + read;
// other code....
}
複製代碼
咱們經過new byte[1024]
來建立一個緩衝區,因爲JVM處於用戶態進程中,因此,此處建立的這個緩衝區爲用戶緩衝區。而後在一個while循環裏面調用read()方法讀數據來觸發syscall read
系統調用。 咱們着重來分析下inputStream.read
調用時所發生的細節:
這裏的用戶緩衝區就是咱們代碼中所
new
的字節數組。整個過程請對照圖3所示內容進行理解。對於操做系統而言,JVM處於用戶態空間中。而處於用戶態空間的進程是不能直接操做底層的硬件的。而IO操做就須要操做底層的硬件,好比硬盤。所以,IO操做必須得藉助內核的幫助才能完成(中斷,trap),即:會有用戶態到內核態的切換。
咱們寫代碼 new byte[] 數組時,通常是都是「隨意」 建立一個「任意大小」的數組。好比,new byte[128]、new byte[1024]、new byte[4096]....
可是,對於硬盤塊的讀取而言,每次訪問硬盤讀數據時,並非讀任意大小的數據的,而是:每次讀一個硬盤塊或者若干個硬盤塊(這是由於訪問硬盤操做代價是很大的) 所以,就須要有一個「中間緩衝區」--即內核緩衝區。先把數據從硬盤讀到內核緩衝區中,而後再把數據從內核緩衝區搬到用戶緩衝區。
這也是爲何咱們總感受到第一次read操做很慢,然後續的read操做卻很快的緣由。對於後續的read操做而言,它所須要讀的數據極可能已經在內核緩衝區了,此時只需將內核緩衝區中的數據拷貝到用戶緩衝區便可,並未涉及到底層的讀取硬盤操做,固然就快了。
而當數據不可用,這個處理進程將會被掛起,並等待內核從硬盤上把數據取到內核緩衝區中。
DMA---用來在設備內存與主存RAM之間直接進行數據交換,這個過程無需CPU干預,對於系統中有大量數據交換的設備而言,若是可以充分利用DMA特性,能夠大大提升系統性能。可參考Linux內核中DMA分析
DMA讀取數據這種操做涉及到底層的硬件,硬件通常是不能直接訪問用戶態空間的,也就是DMA不能直接訪問用戶緩衝區,普通IO操做須要將數據來回地在 用戶緩衝區 和 內核緩衝區移動,這在必定程序上影響了IO的速度。那有沒有相應的解決方案呢?
這裏就涉及到了咱們想要說起的核心內容:直接內存映射IO。
虛擬地址空間有一塊區域,在內存映射文件的時候將某一段的虛擬地址和文件對象的某一部分創建起映射關係,此時並無拷貝數據到內存中去,而是當進程代碼第一次引用這段代碼內的虛擬地址時,觸發了缺頁異常,這時候OS根據映射關係直接將文件的相關部分數據拷貝到進程的用戶私有空間中去,以下圖所示。
圖4
從圖4能夠看出:內核空間的 buffer 與 用戶空間的 buffer 都映射到同一塊 物理內存區域。
它的主要特色以下:
在通過了上面的層層鋪墊以後,咱們再來回顧下ByteBuffer
。ByteBuffer
做爲一個抽象類,其實現分爲兩類:HeapByteBuffer
與DirectByteBuffer
。HeapByteBuffer
是堆內ByteBuffer
,基於用戶態的實現,使用byte[]
存儲數據,咱們前面已經接觸過。DirectByteBuffer
是堆外ByteBuffer
,直接使用堆外內存空間存儲數據,使用直接內存映射IO,這也是NIO
高性能的核心所在之一。那麼咱們一塊兒來分析一下DirectByteBuffer
的相關實現。
咱們可使用java.nio.ByteBuffer#allocateDirect
方法來實例化一個DirectByteBuffer
。
//java.nio.ByteBuffer#allocateDirect
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
DirectByteBuffer(int cap) { // package-private
// 初始化Buffer四個核心屬性
super(-1, 0, cap, cap);
// 判斷是否須要頁面對齊,經過參數-XX:+PageAlignDirectMemory控制,默認爲false
boolean pa = VM.isDirectMemoryPageAligned();
// 獲取每頁內存大小
int ps = Bits.pageSize();
// 分配內存的大小,若是是按頁對齊方式,須要再加一頁內存的容量
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
// 用Bits類保存總分配內存(按頁分配)的大小和實際內存的大小
Bits.reserveMemory(size, cap);
long base = 0;
try {
// 調用unsafe方法分配內存
base = UNSAFE.allocateMemory(size);
} catch (OutOfMemoryError x) {
// 分配失敗,釋放內存
Bits.unreserveMemory(size, cap);
throw x;
}
// 初始化分配內存空間,指定內存大小,該空間中每一個位置值爲0
UNSAFE.setMemory(base, size, (byte) 0);
// 設置內存起始地址,若是須要頁面對齊,
// 則判斷base是否有對齊,有且不是一個頁的起始位置則經過計算進行地址對齊操做
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
// 建立一個cleaner,最後會調用Deallocator.run來釋放內存
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
複製代碼
首先,經過VM.isDirectMemoryPageAligned()
判斷是否須要頁面對齊,關於對齊,咱們這裏來接觸下內在理論。
在現代計算架構中,從內存中讀取數據,基本上都是按2^N
個字節來從主存加載CPU
中。這個值,基本是cache line
的大小。也就是說,若是所讀數據在同一塊cache line
以內是最快的。目前來講,多數PC的cache line
值是128個字節。 對於首地址也是同樣的。在32位機器上,若是有4個字節的內存塊,跨2個cache line
,那麼被加載到CPU的時候,須要2次內存缺失中斷。
好了,言歸正傳。對於任何一種小內存請求,都不會按實際大小分配,首先會按照必定規則進行對齊。這種對齊的規則比較複雜,通常會依照系統頁大小,機器字大小,和系統特性來定製。一般來講,會在不一樣區間採用不一樣的步長。舉個例子:
序號 | 大小區間 | 字節對齊 |
---|---|---|
0 | [0--16] | 8 |
1 | (16 , 128] | 16 |
2 | (128 , 256] | 32 |
3 | (256 , 512] | 64 |
因爲每一個區間的步長不同,又被劃分紅更多的區間。好比(256 , 320]之間長度請求,實際被分配應該是320個字節,而不是512。而1個字節的請求,老是被分配8個字節。
簡單點說,其實就是效率問題,現代計算機讀取內存的時候,通常只能在偶數邊界上開始讀,什麼意思呢,打個比方,在32
位的機器上,一個int
變量變量佔用4
字節,假如這個變量的真實物理內存地址是0x400005
,那計算機在取數的時候會先從0x400004
取4
個字節,再從0x400008
取4
個字節,而後這個變量的值就是前4
個字節的後三位和後4
個字節的第一位,也就是說若是一個變量的地址從奇數開始,就可能要多讀一次內存,而若是從偶數開始,特別是計算機位數/8
的倍數開始,效率就高了!
當須要按頁對齊的時候,內核老是會把vmalloc
函數的參數size
調整到頁對齊,並在調整後的數值上再加一個頁面的大小.內核之因此加一個頁面大小,是爲了防止可能出現的越界訪問。頁是可傳輸到IO設備的最小內存塊。所以,將數據與頁面大小對齊,並使用頁面大小做爲分配單元,以此在寫入硬盤/網絡設備時對交互產生影響。這樣,經過多分配一頁空間,能夠在數據超出一頁大小時,相似於上一段所描述的場景,多讀一次內存,以及要多佔用一頁空間。
// -- Processor and memory-system properties --
private static int PAGE_SIZE = -1;
// java.nio.Bits#pageSize
static int pageSize() {
if (PAGE_SIZE == -1)
PAGE_SIZE = UNSAFE.pageSize();
return PAGE_SIZE;
}
/** * Reports the size in bytes of a native memory page (whatever that is). * This value will always be a power of two. */
public native int pageSize();
複製代碼
由上面DirectByteBuffer(int cap)
這個構造器代碼中給的中文註釋可知,申請分配內存前會調用java.nio.Bits#reserveMemory
判斷是否有足夠的空間可供申請:
//java.nio.Bits#tryReserveMemory
private static boolean tryReserveMemory(long size, int cap) {
// -XX:MaxDirectMemorySize limits the total capacity rather than the
// actual memory usage, which will differ when buffers are page
// aligned.
//經過-XX:MaxDirectMemorySize來判斷用戶申請的大小是否合理,
long totalCap;
//可以使用最大空間減去已使用空間,剩餘可用空間知足需求分配的空間的話設定相關參數,並返回true
while (cap <= MAX_MEMORY - (totalCap = TOTAL_CAPACITY.get())) {
if (TOTAL_CAPACITY.compareAndSet(totalCap, totalCap + cap)) {
RESERVED_MEMORY.addAndGet(size);
COUNT.incrementAndGet();
return true;
}
}
return false;
}
// java.nio.Bits#reserveMemory
// size:根據是否按頁對齊,獲得的真實須要申請的內存大小
// cap:用戶指定須要的內存大小(<=size)
static void reserveMemory(long size, int cap) {
// 獲取最大能夠申請的對外內存大小
// 可經過參數-XX:MaxDirectMemorySize=<size>設置這個大小
if (!MEMORY_LIMIT_SET && VM.initLevel() >= 1) {
MAX_MEMORY = VM.maxDirectMemory();
MEMORY_LIMIT_SET = true;
}
// optimist!
// 有足夠空間可供分配,則直接return,不然,繼續執行下面邏輯,嘗試從新分配
if (tryReserveMemory(size, cap)) {
return;
}
final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();
boolean interrupted = false;
try {
// Retry allocation until success or there are no more
// references (including Cleaners that might free direct
// buffer memory) to process and allocation still fails.
boolean refprocActive;
do {
//這個do while循環中,若沒有更多引用(包括可能釋放直接緩衝區內存的Cleaners)進行處理,接着就從新嘗 //試判斷所申請內存空間是否知足條件,若是這個過程發生異常,則interrupted設定爲true,同時在最後的 //finally代碼塊中打斷當前所在線程。
try {
refprocActive = jlra.waitForReferenceProcessing();
} catch (InterruptedException e) {
// Defer interrupts and keep trying.
interrupted = true;
refprocActive = true;
}
if (tryReserveMemory(size, cap)) {
return;
}
} while (refprocActive);
// trigger VM's Reference processing
System.gc();
long sleepTime = 1;
int sleeps = 0;
while (true) {
if (tryReserveMemory(size, cap)) {
return;
}
if (sleeps >= MAX_SLEEPS) {
break;
}
try {
if (!jlra.waitForReferenceProcessing()) {
Thread.sleep(sleepTime);
sleepTime <<= 1;
sleeps++;
}
} catch (InterruptedException e) {
interrupted = true;
}
}
// no luck
throw new OutOfMemoryError("Direct buffer memory");
} finally {
if (interrupted) {
// don't swallow interrupts
Thread.currentThread().interrupt();
}
}
}
複製代碼
該方法主要用於判斷申請的堆外內存是否超過了用例指定的最大值,若是還有足夠空間能夠申請,則更新對應的變量,若是已經沒有空間能夠申請,則拋出OutOfMemoryError
。
上文提到了DirectByteBuffer
申請內存前會判斷是否有足夠的空間可供申請。用戶能夠經過設定-XX:MaxDirectMemorySize=<size>
來控制能夠申請最大的DirectByteBuffer
內存。可是默認狀況下這個大小是多少呢?
由上面代碼可知,DirectByteBuffer
經過sun.misc.VM#maxDirectMemory
來獲取這個值,咱們來看一下對應的代碼:
// A user-settable upper limit on the maximum amount of allocatable direct
// buffer memory. This value may be changed during VM initialization if
// "java" is launched with "-XX:MaxDirectMemorySize=<size>".
//
// The initial value of this field is arbitrary; during JRE initialization
// it will be reset to the value specified on the command line, if any,
// otherwise to Runtime.getRuntime().maxMemory().
//
private static long directMemory = 64 * 1024 * 1024;
// Returns the maximum amount of allocatable direct buffer memory.
// The directMemory variable is initialized during system initialization
// in the saveAndRemoveProperties method.
//
public static long maxDirectMemory() {
return directMemory;
}
複製代碼
這裏directMemory
賦值爲64MB,那堆外內存默認最大是64MB嗎?答案是否認的,咱們來看註釋,能夠知道,這個值會在JRE
初始化啓動的過程當中被從新設置爲用戶指定的值,若是用戶沒有指定,則會設置爲Runtime.getRuntime().maxMemory()
。
/** * Returns the maximum amount of memory that the Java virtual machine * will attempt to use. If there is no inherent limit then the value * {@link java.lang.Long#MAX_VALUE} will be returned. * * @return the maximum amount of memory that the virtual machine will * attempt to use, measured in bytes * @since 1.4 */
public native long maxMemory();
//src\java.base\share\native\libjava\Runtime.c
JNIEXPORT jlong JNICALL Java_java_lang_Runtime_maxMemory(JNIEnv *env, jobject this) {
return JVM_MaxMemory();
}
//src\hotspot\share\include\jvm.h
JNIEXPORT jlong JNICALL JVM_MaxMemory(void);
//src\hotspot\share\prims\jvm.cpp
JVM_ENTRY_NO_ENV(jlong, JVM_MaxMemory(void))
JVMWrapper("JVM_MaxMemory");
size_t n = Universe::heap()->max_capacity();
return convert_size_t_to_jlong(n);
JVM_END
複製代碼
咱們來看JRE
相關的初始化啓動源碼:
/** * java.lang.System#initPhase1 * Initialize the system class. Called after thread initialization. */
private static void initPhase1() {
// VM might invoke JNU_NewStringPlatform() to set those encoding
// sensitive properties (user.home, user.name, boot.class.path, etc.)
// during "props" initialization, in which it may need access, via
// System.getProperty(), to the related system encoding property that
// have been initialized (put into "props") at early stage of the
// initialization. So make sure the "props" is available at the
// very beginning of the initialization and all system properties to
// be put into it directly.
props = new Properties(84);
initProperties(props); // initialized by the VM
// There are certain system configurations that may be controlled by
// VM options such as the maximum amount of direct memory and
// Integer cache size used to support the object identity semantics
// of autoboxing. Typically, the library will obtain these values
// from the properties set by the VM. If the properties are for
// internal implementation use only, these properties should be
// removed from the system properties.
//
// See java.lang.Integer.IntegerCache and the
// VM.saveAndRemoveProperties method for example.
//
// Save a private copy of the system properties object that
// can only be accessed by the internal implementation. Remove
// certain system properties that are not intended for public access.
// 咱們關注此處便可
VM.saveAndRemoveProperties(props);
lineSeparator = props.getProperty("line.separator");
StaticProperty.javaHome(); // Load StaticProperty to cache the property values
VersionProps.init();
FileInputStream fdIn = new FileInputStream(FileDescriptor.in);
FileOutputStream fdOut = new FileOutputStream(FileDescriptor.out);
FileOutputStream fdErr = new FileOutputStream(FileDescriptor.err);
setIn0(new BufferedInputStream(fdIn));
setOut0(newPrintStream(fdOut, props.getProperty("sun.stdout.encoding")));
setErr0(newPrintStream(fdErr, props.getProperty("sun.stderr.encoding")));
// Setup Java signal handlers for HUP, TERM, and INT (where available).
Terminator.setup();
// Initialize any miscellaneous operating system settings that need to be
// set for the class libraries. Currently this is no-op everywhere except
// for Windows where the process-wide error mode is set before the java.io
// classes are used.
VM.initializeOSEnvironment();
// The main thread is not added to its thread group in the same
// way as other threads; we must do it ourselves here.
Thread current = Thread.currentThread();
current.getThreadGroup().add(current);
// register shared secrets
setJavaLangAccess();
// Subsystems that are invoked during initialization can invoke
// VM.isBooted() in order to avoid doing things that should
// wait until the VM is fully initialized. The initialization level
// is incremented from 0 to 1 here to indicate the first phase of
// initialization has completed.
// IMPORTANT: Ensure that this remains the last initialization action!
VM.initLevel(1);
}
複製代碼
上述源碼中的中文註釋部分表示即爲咱們關心的相關過程,即對directMemory
賦值發生在sun.misc.VM#saveAndRemoveProperties
函數中:
// Save a private copy of the system properties and remove
// the system properties that are not intended for public access.
//
// This method can only be invoked during system initialization.
public static void saveAndRemoveProperties(Properties props) {
if (initLevel() != 0)
throw new IllegalStateException("Wrong init level");
@SuppressWarnings({"rawtypes", "unchecked"})
Map<String, String> sp =
Map.ofEntries(props.entrySet().toArray(new Map.Entry[0]));
// only main thread is running at this time, so savedProps and
// its content will be correctly published to threads started later
savedProps = sp;
// Set the maximum amount of direct memory. This value is controlled
// by the vm option -XX:MaxDirectMemorySize=<size>.
// The maximum amount of allocatable direct buffer memory (in bytes)
// from the system property sun.nio.MaxDirectMemorySize set by the VM.
// The system property will be removed.
String s = (String)props.remove("sun.nio.MaxDirectMemorySize");
if (s != null) {
if (s.equals("-1")) {
// -XX:MaxDirectMemorySize not given, take default
directMemory = Runtime.getRuntime().maxMemory();
} else {
long l = Long.parseLong(s);
if (l > -1)
directMemory = l;
}
}
// Check if direct buffers should be page aligned
s = (String)props.remove("sun.nio.PageAlignDirectMemory");
if ("true".equals(s))
pageAlignDirectMemory = true;
// Remove other private system properties
// used by java.lang.Integer.IntegerCache
props.remove("java.lang.Integer.IntegerCache.high");
// used by sun.launcher.LauncherHelper
props.remove("sun.java.launcher.diag");
// used by jdk.internal.loader.ClassLoaders
props.remove("jdk.boot.class.path.append");
}
複製代碼
因此默認狀況下,DirectByteBuffer
堆外內存默認最大爲Runtime.getRuntime().maxMemory()
,而這個值等於可用的最大Java堆大小,也就是咱們-Xmx
參數指定的值。
同時,咱們在此處也看到了代碼內有主動調用System.gc()
,以清理已分配DirectMemory
中的不用的對象引用,騰出空間。這裏主動調用System.gc()
的目的也是爲了想觸發一次full gc
,此時,咱們要看它所處的位置,若是堆外內存申請不到足夠的空間,則堆外內存會超過其閾值,此時,jdk會經過System.gc()
的內在機制觸發一次full gc
,來進行回收。調用System.gc()
自己就是執行一段相應的邏輯,那咱們來探索下其中的細節。
//java.lang.System#gc
public static void gc() {
Runtime.getRuntime().gc();
}
//java.lang.Runtime#gc
public native void gc();
複製代碼
JNIEXPORT void JNICALL Java_java_lang_Runtime_gc(JNIEnv *env, jobject this) {
JVM_GC();
}
複製代碼
能夠看到直接調用了JVM_GC()方法,這個方法的實如今jvm.cpp中
//src\hotspot\share\prims\jvm.cpp
JVM_ENTRY_NO_ENV(void, JVM_GC(void))
JVMWrapper("JVM_GC");
if (!DisableExplicitGC) {
Universe::heap()->collect(GCCause::_java_lang_system_gc);
}
JVM_END
//src\hotspot\share\runtime\interfaceSupport.inline.hpp
#define JVM_ENTRY_NO_ENV(result_type, header) \
extern "C" { \
result_type JNICALL header { \
JavaThread* thread = JavaThread::current(); \
ThreadInVMfromNative __tiv(thread); \
debug_only(VMNativeEntryWrapper __vew;) \
VM_ENTRY_BASE(result_type, header, thread)
...
#define JVM_END } }
#define VM_ENTRY_BASE(result_type, header, thread) \
TRACE_CALL(result_type, header) \
HandleMarkCleaner __hm(thread); \
Thread* THREAD = thread; \
os::verify_stack_alignment(); \
/* begin of body */
複製代碼
此處#define JVM_ENTRY_NO_ENV
屬於宏定義,這裏可能你們不是很瞭解,就簡單說下。
宏定義分類:
不帶參數的宏定義
#define TRUE 1
帶參數的宏: #define 宏名 ( 參數表) [宏體]
宏定義做用:
方便程序的修改
#define TRUE 1
就是一個實例提升程序的運行效率
加強可讀性
字符串拼接
例如:
#define CAT(a,b,c) a##b##c
main()
{
printf("%d\n" CAT(1,2,3));
printf("%s\n", CAT('a', 'b', 'c');
}
複製代碼
程序的輸出會是:
123
abc
複製代碼
示例:
#defind CAT(n) "abc"#n
main()
{
printf("%s\n", CAT(15));
}
複製代碼
輸出的結果會是
abc15
複製代碼
#define PR(...) printf(_ _VA_ARGS_ _)
複製代碼
其實有點像解釋器模式,簡單點說,咱們彼此約定,我喊 1,你就說:天生我材必有用。接下來咱們進行以下定義:
#define a abcdefg(也能夠是很長一段代碼一個函數)
複製代碼
同理宏就至關於你和編譯器之間的約定,你告訴它 ,當我寫 a ,其實就是指後面那段內容。那麼,預編譯的時候, 編譯器一看 a是這個,這時候它就會把全部的a都替換成了後面那個字符串。
想要繼續深刻,能夠參考[C++宏定義詳解](www.cnblogs.com/fnlingnzb-l…)。
參考咱們在前面列出的jvm.cpp
中JVM_GC()
相關的部分代碼,能夠知道,interfaceSupport.inline.hpp
內定義了JVM_ENTRY_NO_ENV
的宏邏輯,而下面這段代碼則定義了JVM_GC
的相關邏輯,而後JVM_GC
做爲子邏輯在JVM_ENTRY_NO_ENV
的宏邏輯中執行。
JVM_ENTRY_NO_ENV(void, JVM_GC(void))
JVMWrapper("JVM_GC");
if (!DisableExplicitGC) {
Universe::heap()->collect(GCCause::_java_lang_system_gc);
}
JVM_END
複製代碼
咱們這裏再接觸個JDK
中咱們常見的AccessController.doPrivileged
方法,它是在jvm.cpp
中對應的實現爲:
JVM_ENTRY(jobject, JVM_DoPrivileged(JNIEnv *env, jclass cls, jobject action, jobject context, jboolean wrapException))
JVMWrapper("JVM_DoPrivileged");
# 省略的方法體
JVM_END
複製代碼
JVM_ENTRY
也是是一個宏定義,位於interfaceSupport.hpp
中:
#define JVM_ENTRY(result_type, header) \ extern "C" { \ result_type JNICALL header { \ JavaThread* thread=JavaThread::thread_from_jni_environment(env); \ ThreadInVMfromNative __tiv(thread); \ debug_only(VMNativeEntryWrapper __vew;) \ VM_ENTRY_BASE(result_type, header, thread)
複製代碼
而後轉換後,獲得結果以下:
extern "C" { \
jobject JNICALL JVM_DoPrivileged(JNIEnv *env, jclass cls, jobject action, jobject context, jboolean wrapException) { \
JavaThread* thread=JavaThread::thread_from_jni_environment(env); \
ThreadInVMfromNative __tiv(thread); \
debug_only(VMNativeEntryWrapper __vew;) \
....
}
}
複製代碼
關於interfaceSupport.inline.hpp
內定義的JVM_ENTRY_NO_ENV
宏邏輯中的extern "C"
就是下面代碼以 C 語言方式進行編譯,C++能夠嵌套 C 代碼。
源碼中特別常見的 JNICALL
就是一個空的宏定義,只是爲了告訴人這是一個 JNI 調用,宏定義以下:
#define JNICALL
複製代碼
關於JNI,咱們能夠參考https://docs.oracle.com/javase/8/docs/technotes/guides/jni/spec/jniTOC.html文檔來深刻。
參考前面給出的相關源碼,咱們能夠知道,最終調用的是heap的collect方法,GCCause
爲_java_lang_system_gc
,即由於什麼緣由而產生的gc
。咱們能夠經過其相關源碼來看到形成GC的各類情況定義。
//
// This class exposes implementation details of the various
// collector(s), and we need to be very careful with it. If
// use of this class grows, we should split it into public
// and implementation-private "causes".
//
// The definitions in the SA code should be kept in sync
// with the definitions here.
//
// src\hotspot\share\gc\shared\gcCause.hpp
class GCCause : public AllStatic {
public:
enum Cause {
/* public */
_java_lang_system_gc,
_full_gc_alot,
_scavenge_alot,
_allocation_profiler,
_jvmti_force_gc,
_gc_locker,
_heap_inspection,
_heap_dump,
_wb_young_gc,
_wb_conc_mark,
_wb_full_gc,
/* implementation independent, but reserved for GC use */
_no_gc,
_no_cause_specified,
_allocation_failure,
/* implementation specific */
_tenured_generation_full,
_metadata_GC_threshold,
_metadata_GC_clear_soft_refs,
_cms_generation_full,
_cms_initial_mark,
_cms_final_remark,
_cms_concurrent_mark,
_old_generation_expanded_on_last_scavenge,
_old_generation_too_full_to_scavenge,
_adaptive_size_policy,
_g1_inc_collection_pause,
_g1_humongous_allocation,
_dcmd_gc_run,
_z_timer,
_z_warmup,
_z_allocation_rate,
_z_allocation_stall,
_z_proactive,
_last_gc_cause
};
複製代碼
咱們接着回到JVM_GC
定義中,這裏須要注意的是DisableExplicitGC
,若是爲true
就不會執行collect
方法,也就使得System.gc()
無效,DisableExplicitGC
這個參數對應配置爲-XX:+DisableExplicitGC
,默認是false
,可自行配置爲true
。
當DisableExplicitGC
爲默認值的時候,會進入Universe::heap()->collect(GCCause::_java_lang_system_gc);
代碼邏輯,此時,咱們能夠看到,這是一個函數表達式,傳入的參數爲Universe::heap()
:
// The particular choice of collected heap.
static CollectedHeap* heap() { return _collectedHeap; }
CollectedHeap* Universe::_collectedHeap = NULL;
CollectedHeap* Universe::create_heap() {
assert(_collectedHeap == NULL, "Heap already created");
return GCConfig::arguments()->create_heap();
}
複製代碼
如上圖所示,heap
有好幾種,具體是哪一種heap
,須要看咱們所選擇使用的GC
算法,這裏以經常使用的CMS GC
爲例,其對應的heap
是CMSHeap
,因此咱們再看看cmsHeap.hpp
對應的collect
方法:
//src\hotspot\share\gc\cms\cmsHeap.hpp
class CMSHeap : public GenCollectedHeap {
public:
CMSHeap(GenCollectorPolicy *policy);
...
void CMSHeap::collect(GCCause::Cause cause) {
if (should_do_concurrent_full_gc(cause)) {
// Mostly concurrent full collection.
collect_mostly_concurrent(cause);
} else {
GenCollectedHeap::collect(cause);
}
}
...
}
//src\hotspot\share\gc\shared\genCollectedHeap.cpp
void GenCollectedHeap::collect(GCCause::Cause cause) {
if (cause == GCCause::_wb_young_gc) {
// Young collection for the WhiteBox API.
collect(cause, YoungGen);
} else {
#ifdef ASSERT
if (cause == GCCause::_scavenge_alot) {
// Young collection only.
collect(cause, YoungGen);
} else {
// Stop-the-world full collection.
collect(cause, OldGen);
}
#else
// Stop-the-world full collection.
collect(cause, OldGen);
#endif
}
}
複製代碼
首先經過should_do_concurrent_full_gc
方法判斷是否須要進行一次並行Full GC
,若是是則調用collect_mostly_concurrent
方法,進行並行Full GC
;若是不是則通常會走到 collect(cause, OldGen)
這段邏輯,進行Stop-the-world full collection
,咱們通常稱之爲全局暫停(STW)Full GC
。
咱們先看看should_do_concurrent_full_gc
到底有哪些條件:
bool CMSHeap::should_do_concurrent_full_gc(GCCause::Cause cause) {
switch (cause) {
case GCCause::_gc_locker: return GCLockerInvokesConcurrent;
case GCCause::_java_lang_system_gc:
case GCCause::_dcmd_gc_run: return ExplicitGCInvokesConcurrent;
default: return false;
}
}
複製代碼
若是是_java_lang_system_gc
而且ExplicitGCInvokesConcurrent
爲true
則進行並行Full GC
,這裏又引出了另外一個參數ExplicitGCInvokesConcurrent
,若是配置-XX:+ExplicitGCInvokesConcurrent
爲true
,進行並行Full GC
,默認爲false
。
咱們先來看collect_mostly_concurrent
,是如何進行並行Full GC
。
//src\hotspot\share\gc\cms\cmsHeap.cpp
void CMSHeap::collect_mostly_concurrent(GCCause::Cause cause) {
assert(!Heap_lock->owned_by_self(), "Should not own Heap_lock");
MutexLocker ml(Heap_lock);
// Read the GC counts while holding the Heap_lock
unsigned int full_gc_count_before = total_full_collections();
unsigned int gc_count_before = total_collections();
{
MutexUnlocker mu(Heap_lock);
VM_GenCollectFullConcurrent op(gc_count_before, full_gc_count_before, cause);
VMThread::execute(&op);
}
}
複製代碼
最終經過VMThread
來進行VM_GenCollectFullConcurrent
中的void VM_GenCollectFullConcurrent::doit()
方法來進行回收(相關英文註釋很明確,就再也不解釋了):
// VM operation to invoke a concurrent collection of a
// GenCollectedHeap heap.
void VM_GenCollectFullConcurrent::doit() {
assert(Thread::current()->is_VM_thread(), "Should be VM thread");
assert(GCLockerInvokesConcurrent || ExplicitGCInvokesConcurrent, "Unexpected");
CMSHeap* heap = CMSHeap::heap();
if (_gc_count_before == heap->total_collections()) {
// The "full" of do_full_collection call below "forces"
// a collection; the second arg, 0, below ensures that
// only the young gen is collected. XXX In the future,
// we'll probably need to have something in this interface
// to say do this only if we are sure we will not bail
// out to a full collection in this attempt, but that's
// for the future.
assert(SafepointSynchronize::is_at_safepoint(),
"We can only be executing this arm of if at a safepoint");
GCCauseSetter gccs(heap, _gc_cause);
heap->do_full_collection(heap->must_clear_all_soft_refs(), GenCollectedHeap::YoungGen);
} // Else no need for a foreground young gc
assert((_gc_count_before < heap->total_collections()) ||
(GCLocker::is_active() /* gc may have been skipped */
&& (_gc_count_before == heap->total_collections())),
"total_collections() should be monotonically increasing");
MutexLockerEx x(FullGCCount_lock, Mutex::_no_safepoint_check_flag);
assert(_full_gc_count_before <= heap->total_full_collections(), "Error");
if (heap->total_full_collections() == _full_gc_count_before) {
// Nudge the CMS thread to start a concurrent collection.
CMSCollector::request_full_gc(_full_gc_count_before, _gc_cause);
} else {
assert(_full_gc_count_before < heap->total_full_collections(), "Error");
FullGCCount_lock->notify_all(); // Inform the Java thread its work is done
}
}
複製代碼
簡單的說,這裏執行了一次Young GC
來回收Young
區,接着咱們來關注CMSCollector::request_full_gc
這個方法:
//src\hotspot\share\gc\cms\concurrentMarkSweepGeneration.cpp
void CMSCollector::request_full_gc(unsigned int full_gc_count, GCCause::Cause cause) {
CMSHeap* heap = CMSHeap::heap();
unsigned int gc_count = heap->total_full_collections();
if (gc_count == full_gc_count) {
MutexLockerEx y(CGC_lock, Mutex::_no_safepoint_check_flag);
_full_gc_requested = true;
_full_gc_cause = cause;
CGC_lock->notify(); // nudge CMS thread
} else {
assert(gc_count > full_gc_count, "Error: causal loop");
}
}
複製代碼
這裏主要關注在gc_count == full_gc_count
的狀況下,_full_gc_requested
被設置成true
以及喚醒CMS
回收線程。 這裏須要說起一下,CMS GC
有個後臺線程一直在掃描,以肯定是否進行一次CMS GC
,這個線程默認2s
進行一次掃描,其中有個_full_gc_requested
是否爲true
的判斷條件,若是爲true
,進行一次CMS GC
,對Old
和Perm
區進行一次回收。
正常Full GC會執行下面的邏輯:
void GenCollectedHeap::collect(GCCause::Cause cause, GenerationType max_generation) {
// The caller doesn't have the Heap_lock
assert(!Heap_lock->owned_by_self(), "this thread should not own the Heap_lock");
MutexLocker ml(Heap_lock);
collect_locked(cause, max_generation);
}
// this is the private collection interface
// The Heap_lock is expected to be held on entry.
//src\hotspot\share\gc\shared\genCollectedHeap.cpp
void GenCollectedHeap::collect_locked(GCCause::Cause cause, GenerationType max_generation) {
// Read the GC count while holding the Heap_lock
unsigned int gc_count_before = total_collections();
unsigned int full_gc_count_before = total_full_collections();
{
MutexUnlocker mu(Heap_lock); // give up heap lock, execute gets it back
VM_GenCollectFull op(gc_count_before, full_gc_count_before, cause, max_generation);
VMThread::execute(&op);
}
}
複製代碼
經過VMThread
調用VM_GenCollectFull
中的void VM_GenCollectFull::doit()
方法來進行回收。
//src\hotspot\share\gc\shared\vmGCOperations.cpp
void VM_GenCollectFull::doit() {
SvcGCMarker sgcm(SvcGCMarker::FULL);
GenCollectedHeap* gch = GenCollectedHeap::heap();
GCCauseSetter gccs(gch, _gc_cause);
gch->do_full_collection(gch->must_clear_all_soft_refs(), _max_generation);
}
//src\hotspot\share\gc\shared\genCollectedHeap.cpp
void GenCollectedHeap::do_full_collection(bool clear_all_soft_refs,
GenerationType last_generation) {
GenerationType local_last_generation;
if (!incremental_collection_will_fail(false /* don't consult_young */) &&
gc_cause() == GCCause::_gc_locker) {
local_last_generation = YoungGen;
} else {
local_last_generation = last_generation;
}
do_collection(true, // full
clear_all_soft_refs, // clear_all_soft_refs
0, // size
false, // is_tlab
local_last_generation); // last_generation
// Hack XXX FIX ME !!!
// A scavenge may not have been attempted, or may have
// been attempted and failed, because the old gen was too full
if (local_last_generation == YoungGen && gc_cause() == GCCause::_gc_locker &&
incremental_collection_will_fail(false /* don't consult_young */)) {
log_debug(gc, jni)("GC locker: Trying a full collection because scavenge failed");
// This time allow the old gen to be collected as well
do_collection(true, // full
clear_all_soft_refs, // clear_all_soft_refs
0, // size
false, // is_tlab
OldGen); // last_generation
}
}
複製代碼
這裏最終會經過GenCollectedHeap
的do_full_collection
方法(此方法代碼量比較多,就不展開分析了)進行一次Full GC
,將回收Young
、Old
、Perm
區,而且即便Old
區使用的是CMS GC
,也會對Old
區進行compact
,也就是MSC,標記-清除-壓縮。
stop the world
咱們前面有提到VMThread
,在JVM
中經過這個線程不斷輪詢它的隊列,該隊列裏主要是存一些VM_operation
的動做,好比最多見的就是內存分配失敗,並要求作GC操做的請求等,在對GC
這些操做執行的時候會先將其餘業務線程都進入到安全點,也就是這些線程今後再也不執行任何字節碼指令,只有當出了安全點的時候才讓他們繼續執行原來的指令,所以這其實就是咱們說的stop the world(STW)
,整個進程至關於靜止了。
CMS GC
CMS GC
咱們可分爲background
和foreground
兩種模式,顧名思義,其中background
是在後臺作的,也就是能夠不影響正常的業務線程跑,觸發條件好比在old
的內存佔比超過多少的時候就可能觸發一次background
的CMS GC
,這個過程會經歷CMS GC
的全部階段,該暫停的暫停,該並行的並行,效率相對來講還比較高,畢竟有和業務線程並行的GC
階段;而foreground
則否則,它發生的場景好比業務線程請求分配內存,可是內存不夠了,因而可能觸發一次CMS GC
,這個過程就必須是要等內存分配到了線程才能繼續往下面走的,所以整個過程必須是STW
的,此時的CMS GC
整個過程都是暫停應用的,可是爲了提升效率,它並非每一個階段都會走的,只走其中一些階段,跳過的階段主要是並行階段,即Precleaning
、AbortablePreclean
,Resizing
這幾個階段都不會經歷,其中sweep
階段是同步的,但無論怎麼說若是走了相似foreground
的cms gc
,那麼整個過程業務線程都是不可用的,效率會影響挺大。
正常Full GC
實際上是整個GC
過程是真正意義上的Full GC
,還有些場景雖然調用Full GC
的接口,可是並不會都作,有些時候只作Young GC
,有些時候只作cms gc
。並且由前面的代碼可知,最終都是由VMThread
來執行的,所以整個時間是Young GC+CMS GC
的時間之和,其中CMS GC
是上面提到的foreground
式的,所以整個過程會比較長,也是咱們要避免的。
並行Full GC也通樣會作YGC
和CMS GC
,可是效率高就高在CMS GC
是走的background
的,整個暫停的過程主要是YGC+CMS_initMark+CMS_remark
幾個階段。
GenCollectedHeap::collect
這個方法中有一句註釋The caller doesn't have the Heap_lock
,即調用者並不持有Heap_lock
,也就能理解foreground
了。
System.gc()
會觸發Full GC
,能夠經過-XX:+DisableExplicitGC
參數屏蔽System.gc()
,在使用CMS GC
的前提下,也可使用-XX:+ExplicitGCInvokesConcurrent
參數來進行並行Full GC
,提高性能。 不過,通常不推薦使用System.gc()
,由於Full GC
耗時比較長,對應用影響較大。一樣也不建議設置-XX:+DisableExplicitGC
,特別是在有使用堆外內存的狀況下,若是堆外內存申請不到足夠的空間,jdk
會觸發一次System.gc()
,來進行回收,若是屏蔽了,申請不到內存,天然就OOME
了。
參考博客 :
sun.misc.Unsafe
提供了一組方法來進行內存的分配,從新分配,以及釋放。它們和C
的malloc/free
方法很像:
java.lang.OutOfMemoryError
的異常。它會返回一個非零的內存地址。address
指向的地方)中拷貝到的新分配的內存塊中。若是地址等於0,這個方法和allocateMemory
的效果是同樣的。它返回的是新的內存緩衝區的地址。address
爲0
則什麼也不作。//jdk.internal.misc.Unsafe#allocateMemory
public long allocateMemory(long bytes) {
allocateMemoryChecks(bytes);
if (bytes == 0) {
return 0;
}
long p = allocateMemory0(bytes);
if (p == 0) {
throw new OutOfMemoryError();
}
return p;
}
//jdk.internal.misc.Unsafe#allocateMemory0
private native long allocateMemory0(long bytes);
複製代碼
關於allocateMemory0
這個本地方法定義以下:
//src\hotspot\share\prims\unsafe.cpp
UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory0(JNIEnv *env, jobject unsafe, jlong size)) {
size_t sz = (size_t)size;
sz = align_up(sz, HeapWordSize);
void* x = os::malloc(sz, mtOther);
return addr_to_java(x);
} UNSAFE_END
複製代碼
能夠看出sun.misc.Unsafe#allocateMemory
使用malloc
這個C標準庫的函數來申請內存。若是使用的是Linux,多半就是用的Linux
自帶的glibc
裏的ptmalloc
。
在DirectByteBuffer
的構造函數的最後,咱們看到這行代碼:
// 建立一個cleaner,最後會調用Deallocator.run來釋放內存
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
複製代碼
DirectByteBuffer
自己是一個Java
對象,其是位於堆內存中的,經過JDK
的GC
機制能夠自動幫咱們回收,但其申請的直接內存,不在GC
範圍以內,沒法自動回收。咱們是否是能夠爲DirectByteBuffer
這個堆內存對象註冊一個鉤子函數(這裏能夠經過Runnable
接口的run
方法來實現這個動做),當DirectByteBuffer
對象被GC
回收的時候,會回調這個run
方法,即在這個方法中執行釋放DirectByteBuffer
引用的直接內存,也就是在run
方法中調用Unsafe
的freeMemory
方法。由上面所示代碼可知,註冊是經過sun.misc.Cleaner
類的Create
方法來實現的。
//jdk.internal.ref.Cleaner#create
/** * Creates a new cleaner. * * @param ob the referent object to be cleaned * @param thunk * The cleanup code to be run when the cleaner is invoked. The * cleanup code is run directly from the reference-handler thread, * so it should be as simple and straightforward as possible. * * @return The new cleaner */
public static Cleaner create(Object ob, Runnable thunk) {
if (thunk == null)
return null;
return add(new Cleaner(ob, thunk));
}
//jdk.internal.ref.Cleaner#clean
/** * Runs this cleaner, if it has not been run before. */
public void clean() {
if (!remove(this))
return;
try {
thunk.run();
} catch (final Throwable x) {
AccessController.doPrivileged(new PrivilegedAction<>() {
public Void run() {
if (System.err != null)
new Error("Cleaner terminated abnormally", x)
.printStackTrace();
System.exit(1);
return null;
}});
}
}
複製代碼
由以前代碼和上面代碼註釋可知,其中第一個參數是一個堆內存對象,這裏是指DirectByteBuffer
對象,第二個參數是一個Runnable
任務,其內定義了一個動做,表示這個堆內存對象被回收的時候,須要執行的回調方法。咱們能夠看到在DirectByteBuffer
的最後一行中,傳入的這兩個參數分別是this
,和一個Deallocator
(實現了Runnable
接口),其中this
表示就是當前DirectByteBuffer
實例,也就是當前DirectByteBuffer
被回收的時候,回調Deallocator
的run
方法,清除DirectByteBuffer
引用的直接內存,代碼以下所示:
private static class Deallocator implements Runnable {
private long address;
private long size;
private int capacity;
private Deallocator(long address, long size, int capacity) {
assert (address != 0);
this.address = address;
this.size = size;
this.capacity = capacity;
}
public void run() {
if (address == 0) {
// Paranoia
return;
}
UNSAFE.freeMemory(address);
address = 0;
Bits.unreserveMemory(size, capacity);
}
}
複製代碼
能夠看到run
方法中調用了UNSAFE.freeMemory
方法釋放了直接內存的引用。
由於DirectByteBuffer
申請的內存是在堆外,而DirectByteBuffer
自己也只保存了內存的起始地址,因此DirectByteBuffer
的內存佔用是由堆內的DirectByteBuffer
對象與堆外的對應內存空間共同構成。
按照咱們以前的玩法,Java中能夠利用的特性有finalize
函數,可是finalize
機制是Java官方不推薦的,由於有諸多須要注意的地方,推薦的作法是使用虛引用來處理對象被回收時的後續處理工做。這裏JDK
提供了Cleaner
類來簡化這個操做,Cleaner
是PhantomReference
的子類,那麼就能夠在PhantomReference
被加入ReferenceQueue
時觸發對應的Runnable
回調。
DirectByteBuffer
最終會使用sun.misc.Unsafe#getByte(long)
和sun.misc.Unsafe#putByte(long, byte)
這兩個方法來讀寫堆外內存空間的指定位置的字節數據。無非就是經過地址來讀寫相應內存位置的數據,具體代碼以下所示。
//java.nio.Buffer#nextGetIndex()
final int nextGetIndex() { // package-private
if (position >= limit)
throw new BufferUnderflowException();
return position++;
}
//java.nio.DirectByteBuffer
public long address() {
return address;
}
private long ix(int i) {
return address + ((long)i << 0);
}
public byte get() {
try {
return ((UNSAFE.getByte(ix(nextGetIndex()))));
} finally {
Reference.reachabilityFence(this);
}
}
public byte get(int i) {
try {
return ((UNSAFE.getByte(ix(checkIndex(i)))));
} finally {
Reference.reachabilityFence(this);
}
}
public ByteBuffer put(byte x) {
try {
UNSAFE.putByte(ix(nextPutIndex()), ((x)));
} finally {
Reference.reachabilityFence(this);
}
return this;
}
public ByteBuffer put(int i, byte x) {
try {
UNSAFE.putByte(ix(checkIndex(i)), ((x)));
} finally {
Reference.reachabilityFence(this);
}
return this;
}
複製代碼
MappedByteBuffer
本應該是DirectByteBuffer
的子類,但爲了保持結構規範清晰簡單,而且出於優化目的,反過來更恰當,也是由於DirectByteBuffer
屬於包級別的私有類(即class關鍵字前並無類權限定義),在定義抽象類的時候本就是爲了可擴展,這樣,你們也就能夠明白JDK爲什麼這麼設計了。雖然MappedByteBuffer
在邏輯上應該是DirectByteBuffer的子類,並且MappedByteBuffer
的內存的GC和DirectByteBuffer
的GC相似(和堆GC不一樣),可是分配的MappedByteBuffer
的大小不受-XX:MaxDirectMemorySize
參數影響。 由於要基於系統級別的IO操做,因此須要給其設定一個FileDescriptor
來映射buffer
的操做,若是並未映射到buffer
,那這個FileDescriptor
爲null
。
MappedByteBuffer
封裝的是內存映射文件操做,也就是隻能進行文件IO操做。MappedByteBuffer
是根據mmap
產生的映射緩衝區,這部分緩衝區被映射到對應的文件頁上,經過MappedByteBuffer
能夠直接操做映射緩衝區,而這部分緩衝區又被映射到文件頁上,操做系統經過對應內存頁的調入和調出完成文件的寫入和寫出。
咱們能夠經過java.nio.channels.FileChannel#map(MapMode mode,long position, long size)
獲得MappedByteBuffer
,咱們來看sun.nio.ch.FileChannelImpl
對它的實現:
private static final int MAP_RO = 0;
private static final int MAP_RW = 1;
private static final int MAP_PV = 2;
//sun.nio.ch.FileChannelImpl#map
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
ensureOpen();
if (mode == null)
throw new NullPointerException("Mode is null");
if (position < 0L)
throw new IllegalArgumentException("Negative position");
if (size < 0L)
throw new IllegalArgumentException("Negative size");
if (position + size < 0)
throw new IllegalArgumentException("Position + size overflow");
//最大2G
if (size > Integer.MAX_VALUE)
throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");
int imode = -1;
if (mode == MapMode.READ_ONLY)
imode = MAP_RO;
else if (mode == MapMode.READ_WRITE)
imode = MAP_RW;
else if (mode == MapMode.PRIVATE)
imode = MAP_PV;
assert (imode >= 0);
if ((mode != MapMode.READ_ONLY) && !writable)
throw new NonWritableChannelException();
if (!readable)
throw new NonReadableChannelException();
long addr = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return null;
long mapSize;
int pagePosition;
synchronized (positionLock) {
long filesize;
do {
//nd.size()返回實際的文件大小
filesize = nd.size(fd);
} while ((filesize == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
//若是實際文件大小 小於所需求文件大小,則增大文件的大小,
//文件的大小被改變,文件增大的部分默認設置爲0。
if (filesize < position + size) { // Extend file size
if (!writable) {
throw new IOException("Channel not open for writing " +
"- cannot extend file to required size");
}
int rv;
do {
//增大文件的大小
rv = nd.truncate(fd, position + size);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
}
//若是要求映射的文件大小爲0,則不調用操做系統的mmap調用,
//只是生成一個空間容量爲0的DirectByteBuffer並返回
if (size == 0) {
addr = 0;
// a valid file descriptor is not required
FileDescriptor dummy = new FileDescriptor();
if ((!writable) || (imode == MAP_RO))
return Util.newMappedByteBufferR(0, 0, dummy, null);
else
return Util.newMappedByteBuffer(0, 0, dummy, null);
}
//allocationGranularity爲所映射的緩衝區分配內存大小,pagePosition爲第多少頁
pagePosition = (int)(position % allocationGranularity);
//獲得映射的位置,即從mapPosition開始映射
long mapPosition = position - pagePosition;
//從頁的最開始映射加pagePosition,以此增大映射空間
mapSize = size + pagePosition;
try {
//後面會進行解讀
// If map0 did not throw an exception, the address is valid
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
// An OutOfMemoryError may indicate that we've exhausted
// memory so force gc and re-attempt map
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException y) {
Thread.currentThread().interrupt();
}
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError y) {
// After a second OOME, fail
throw new IOException("Map failed", y);
}
}
} // synchronized
// On Windows, and potentially other platforms, we need an open
// file descriptor for some mapping operations.
FileDescriptor mfd;
try {
mfd = nd.duplicateForMapping(fd);
} catch (IOException ioe) {
unmap0(addr, mapSize);
throw ioe;
}
assert (IOStatus.checkAll(addr));
assert (addr % allocationGranularity == 0);
int isize = (int)size;
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
return Util.newMappedByteBufferR(isize,
addr + pagePosition,
mfd,
um);
} else {
return Util.newMappedByteBuffer(isize,
addr + pagePosition,
mfd,
um);
}
} finally {
threads.remove(ti);
endBlocking(IOStatus.checkAll(addr));
}
}
複製代碼
咱們來看sun.nio.ch.FileChannelImpl#map0
的實現:
//src\java.base\unix\native\libnio\ch\FileChannelImpl.c
JNIEXPORT jlong JNICALL Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this, jint prot, jlong off, jlong len) {
void *mapAddress = 0;
jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
//這裏獲得所操做文件的讀取狀態,即對應的文件描述符的值
jint fd = fdval(env, fdo);
int protections = 0;
int flags = 0;
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
//這裏就是操做系統調用了,mmap64是宏定義,實際最後調用的是mmap
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
if (mapAddress == MAP_FAILED) {
if (errno == ENOMEM) {
//若是沒有映射成功,直接拋出OutOfMemoryError
JNU_ThrowOutOfMemoryError(env, "Map failed");
return IOS_THROWN;
}
return handle(env, -1, "Map failed");
}
return ((jlong) (unsigned long) mapAddress);
}
複製代碼
這裏要注意的是,雖然FileChannel.map()
的size
參數是long
,可是size
的大小最大爲Integer.MAX_VALUE
,也就是最大隻能映射最大2G
大小的空間。實際上操做系統提供的mmap
能夠分配更大的空間,可是JAVA
在此處限制在2G
。 這裏咱們來涉及一個生產事故,使用spark
處理較大的數據文件,遇到了分區2G限制的問題,spark
會報以下的日誌:
WARN scheduler.TaskSetManager: Lost task 19.0 in stage 6.0 (TID 120, 10.111.32.47): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:146)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
複製代碼
結合以前的源碼:
//最大2G
if (size > Integer.MAX_VALUE)
throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");
複製代碼
咱們是否是能夠很輕易的定位到錯誤的所在,以及爲什麼會產生這樣的錯誤,雖然日誌裏寫的也很清楚,但咱們從本質上有了更深刻的理解。因而咱們就能夠想辦法了,既然改變不了2G這個限制,那麼咱們就把容器數量提升上來就能夠了,也就是手動設置RDD的分區數量。當前使用的Spark
默認RDD
分區是18
個,手動設置爲500
個(具體還須要根據本身生產環境中的實際內存容量考慮),上面這個問題就迎刃而解了。具體操做爲,能夠在RDD
加載後,使用RDD.repartition(numPart:Int)
函數從新設置分區數量。
val data_new = data.repartition(500)
複製代碼
MappedByteBuffer是經過mmap產生獲得的緩衝區,這部分緩衝區是由操做系統直接建立和管理的,最後JVM經過unmmap讓操做系統直接釋放這部份內存。
private static void unmap(MappedByteBuffer bb) {
Cleaner cl = ((DirectBuffer)bb).cleaner();
if (cl != null)
cl.clean();
}
複製代碼
能夠看到,這裏傳入的一個MappedByteBuffer
類型的參數,咱們回到sun.nio.ch.FileChannelImpl#map
方法實現中,爲了方便回收,這裏對所操做的文件描述符進行再次包裝,即mfd = nd.duplicateForMapping(fd)
,而後一樣經過一個Runnable
接口的實現來定義一個釋放內存的行爲(這裏是Unmapper
實現),因而Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
也就不難理解了,最後,由於咱們要返回一個MappedByteBuffer
對象,因此,就有以下代碼實現:
int isize = (int)size;
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
return Util.newMappedByteBufferR(isize,
addr + pagePosition,
mfd,
um);
} else {
return Util.newMappedByteBuffer(isize,
addr + pagePosition,
mfd,
um);
}
複製代碼
其實就是建立了一個DirectByteBuffer
對象,這裏的回收策略和咱們以前接觸的java.nio.ByteBuffer#allocateDirect
(也就是java.nio.DirectByteBuffer#DirectByteBuffer(int)
)是不一樣的。這裏是須要最後調用munmap
來進行系統回收的。
protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper) {
super(-1, 0, cap, cap, fd);
address = addr;
cleaner = Cleaner.create(this, unmapper);
att = null;
}
// -- Memory-mapped buffers --
//sun.nio.ch.FileChannelImpl.Unmapper
private static class Unmapper implements Runnable {
// may be required to close file
private static final NativeDispatcher nd = new FileDispatcherImpl();
// keep track of mapped buffer usage
static volatile int count;
static volatile long totalSize;
static volatile long totalCapacity;
private volatile long address;
private final long size;
private final int cap;
private final FileDescriptor fd;
private Unmapper(long address, long size, int cap, FileDescriptor fd) {
assert (address != 0);
this.address = address;
this.size = size;
this.cap = cap;
this.fd = fd;
synchronized (Unmapper.class) {
count++;
totalSize += size;
totalCapacity += cap;
}
}
public void run() {
if (address == 0)
return;
unmap0(address, size);
address = 0;
// if this mapping has a valid file descriptor then we close it
if (fd.valid()) {
try {
nd.close(fd);
} catch (IOException ignore) {
// nothing we can do
}
}
synchronized (Unmapper.class) {
count--;
totalSize -= size;
totalCapacity -= cap;
}
}
}
複製代碼
此處涉及的unmap0(address, size)
本地實現以下,能夠看到,它調用了munmap
。
JNIEXPORT jint JNICALL Java_sun_nio_ch_FileChannelImpl_unmap0(JNIEnv *env, jobject this, jlong address, jlong len) {
void *a = (void *)jlong_to_ptr(address);
return handle(env,
munmap(a, (size_t)len),
"Unmap failed");
}
複製代碼
關於FileChannel
的map
方法,簡單的說就是將文件映射爲內存映像文件。也就是經過MappedByteBuffer map(int mode,long position,long size)
能夠把文件的從position
開始的size
大小的區域映射爲內存映像文件,mode
是指可訪問該內存映像文件的方式:READ_ONLY
,READ_WRITE
,PRIVATE
。
READ_ONLY
(MapMode.READ_ONLY
只讀):試圖修改獲得的緩衝區將致使拋出 ReadOnlyBufferException
。READ_WRITE
(MapMode.READ_WRITE
讀/寫):對獲得的緩衝區的更改最終將傳播到文件;該更改對映射到同一文件的其餘程序不必定是可見的。PRIVATE
(MapMode.PRIVATE
專用): 對獲得的緩衝區的更改不會傳播到文件,而且該更改對映射到同一文件的其餘程序也不是可見的;相反,會建立緩衝區已修改部分的專用副本。調用FileChannel
的map()
方法後,便可將文件的某一部分或所有映射到內存中,而由前文可知,映射內存緩衝區是個直接緩衝區,雖繼承自ByteBuffer
,但相對於ByteBuffer
,它有更多的優勢:
簡而言之,就是經過mmap
將文件直接映射到用戶態的內存地址,這樣對文件的操做就再也不是write/read
,而是直接對內存地址的操做。 在c中提供了三個函數來實現 :
mmap
: 進行映射。munmap
: 取消映射。msync
: 進程在映射空間的對共享內容的改變並不直接寫回到硬盤文件中,若是不使用此方法,那就沒法保證在調用munmap
以前寫回更改。首先創建好虛擬內存和硬盤文件之間的映射(mmap
系統調用),當進程訪問頁面時產生一個缺頁中斷,內核將頁面讀入內存(也就是說把硬盤上的文件拷貝到內存中),而且更新頁表指向該頁面。 全部進程共享同一物理內存,物理內存中能夠只存儲一份數據,不一樣的進程只須要把本身的虛擬內存映射過去就能夠了,這種方式很是方便於同一副本的共享,節省內存。 通過內存映射以後,文件內的數據就能夠用內存讀/寫指令來訪問,而不是用Read
和Write
這樣的I/O
系統函數,從而提升了文件存取速度。
這裏,咱們對msync
、munmap
、close(fd)
這三者經過一個小Demo來做下說明,只須要看註釋便可。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <fcntl.h>
int main(int argc, char *argv[]) {
int fd;
char *addr;
char *str = "Hello World";
fd = open("./a",O_CREAT|O_RDWR|O_TRUNC,0666);
if(fd == -1)
{
perror("open file fail:");
exit(1);
}
if(ftruncate(fd,4096)==-1)
{
perror("ftruncate fail:");
close(fd);
exit(1);
}
addr =(char *) mmap(NULL,4096,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);
if(addr == (char *)MAP_FAILED)
{
perror("mmap fail:");
exit(1);
}
memset(addr,' ',4096);
memcpy(addr,str,strlen(str)); //寫入一個hello world
// 關閉文件依然能夠經過msync將映射空間的內容寫入文件,實現空間和文件的同步。
close(fd);
memcpy(addr+strlen(str),str,strlen(str)); //再寫入一個hello world
//同步到文件中
//MS_ASYNC的做用是,無論映射區是否更新,直接沖洗返回。
//MS_SYNC的做用是,若是映射區更新了,則沖洗返回,
//若是映射區沒有更新,則等待,直到更新完畢,沖洗返回。
//MS_INVALIDATE的做用是,丟棄映射區中和原文件相同的部分。
if(msync(addr,4096,MS_SYNC)==-1)
{
perror("msync fail:");
exit(1);
}
munmap(addr,4096);
return 0;
}
複製代碼
更多的能夠參考MappedByteBuffer以及mmap的底層原理
爲了配合FileChannel
的map
方法,這裏有必要介紹下它的三個配套方法:
force()
:緩衝區是READ_WRITE
模式下,此方法會對緩衝區內容的修改強行寫入文件,即將緩衝區內存更新的內容刷到硬盤中。load()
:將緩衝區的內容載入內存,並返回該緩衝區的引用。isLoaded()
:若是緩衝區的內容在物理內存中,則返回真,不然返回假。 這裏,咱們對sun.nio.ch.FileChannelImpl#force
實現進行下分析,首來看其相關源碼。//sun.nio.ch.FileChannelImpl#force
public void force(boolean metaData) throws IOException {
ensureOpen();
int rv = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return;
do {
rv = nd.force(fd, metaData);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
endBlocking(rv > -1);
assert IOStatus.check(rv);
}
}
//sun.nio.ch.FileDispatcherImpl#force
int force(FileDescriptor fd, boolean metaData) throws IOException {
return force0(fd, metaData);
}
static native int force0(FileDescriptor fd, boolean metaData)
throws IOException;
//src\java.base\unix\native\libnio\ch\FileDispatcherImpl.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
jint fd = fdval(env, fdo);
int result = 0;
#ifdef MACOSX
result = fcntl(fd, F_FULLFSYNC);
if (result == -1 && errno == ENOTSUP) {
/* Try fsync() in case F_FULLSYUNC is not implemented on the file system. */
result = fsync(fd);
}
#else /* end MACOSX, begin not-MACOSX */
if (md == JNI_FALSE) {
result = fdatasync(fd);
} else {
#ifdef _AIX
/* On AIX, calling fsync on a file descriptor that is opened only for
* reading results in an error ("EBADF: The FileDescriptor parameter is
* not a valid file descriptor open for writing.").
* However, at this point it is not possibly anymore to read the
* 'writable' attribute of the corresponding file channel so we have to
* use 'fcntl'.
*/
int getfl = fcntl(fd, F_GETFL);
if (getfl >= 0 && (getfl & O_ACCMODE) == O_RDONLY) {
return 0;
}
#endif /* _AIX */
result = fsync(fd);
}
#endif /* not-MACOSX */
return handle(env, result, "Force failed");
}
複製代碼
咱們跳過針對MACOSX
的實現,只關注針對linux平臺的。發現force
在傳入參數爲false
的狀況下,調用的是fdatasync(fsync)
。 經過查詢Linux
函數手冊(可參考fdatasync),咱們能夠看到:
fsync() transfers ("flushes") all modified in-core data of (i.e., modified buffer cache pages for) the file referred to by the file descriptor fd to the disk device (or other permanent storage device) so that all changed information can be retrieved even after the system crashed or was rebooted. This includes writing through or flushing a disk cache if present. The call blocks until the device reports that the transfer has completed. It also flushes metadata information associated with the file (see stat(2)).
Calling fsync() does not necessarily ensure that the entry in the directory containing the file has also reached disk. For that an explicit fsync() on a file descriptor for the directory is also needed.
fdatasync() is similar to fsync(), but does not flush modified metadata unless that metadata is needed in order to allow a subsequent data retrieval to be correctly handled. For example, changes to st_atime or st_mtime (respectively, time of last access and time of last modification; see stat(2)) do not require flushing because they are not necessary for a subsequent data read to be handled correctly. On the other hand, a change to the file size (st_size, as made by say ftruncate(2)), would require a metadata flush.
The aim of fdatasync() is to reduce disk activity for applications that do not require all metadata to be synchronized with the disk.
複製代碼
簡單描述下,fdatasync
只刷新數據到硬盤。fsync
同時刷新數據和inode
信息到硬盤,例如st_atime
。 由於inode
和數據不是連續存放在硬盤中,因此fsync
須要更多的寫硬盤,可是可讓inode
獲得更新。若是不關注inode
信息的狀況(例如最近一次訪問文件),能夠經過使用fdatasync
提升性能。對於關注inode
信息的狀況,則應該使用fsync
。
須要注意,若是物理硬盤的write cache
是開啓的,那麼fsync
和fdatasync
將不能保證回寫的數據被完整的寫入到硬盤存儲介質中(數據可能依然保存在硬盤的cache中,並無寫入介質),所以可能會出現明明調用了fsync
系統調用可是數據在掉電後依然丟失了或者出現文件系統不一致的狀況。
這裏,爲了保證硬盤上實際文件系統與緩衝區高速緩存中內容的一致性,UNIX
系統提供了sync
、fsync
和fdatasync
三個函數。 sync
函數只是將全部修改過的塊緩衝區排入寫隊列,而後就返回,它並不等待實際寫硬盤操做結束。 一般稱爲update
的系統守護進程會週期性地(通常每隔30秒)調用sync
函數。這就保證了按期沖洗內核的塊緩衝區。命令sync(1)
也調用sync
函數。 fsync
函數只對由文件描述符filedes
指定的單一文件起做用,而且等待寫硬盤操做結束,而後返回。fsync
可用於數據庫這樣的應用程序,這種應用程序須要確保將修改過的塊當即寫到硬盤上。 fdatasync
函數相似於fsync
,但它隻影響文件的數據部分。而除數據外,fsync
還會同步更新文件的屬性。
也就是說,對於fdatasync
而言,會首先寫到page cache
,而後由pdflush
定時刷到硬盤中,那這麼說mmap
只是在進程空間分配一個內存地址,真實的內存仍是使用的pagecache
。因此force
是調用fsync
將dirty page
刷到硬盤中,但mmap
還有共享之類的實現起來應該很複雜。
也就是說,在Linux
中,當FileChannel
中的force
傳入參數爲true
時,調用fsync
,false
調用fdatasync
,fdatasync
只刷數據不刷meta
數據 。即便不調用force
,內核也會按期將dirty page
刷到硬盤,默認是30s
。
最後,咱們給出一個使用的Demo:
FileOutputStream outputStream = new FileOutputStream("/Users/simviso/b.txt");
// 強制文件數據與元數據落盤
outputStream.getChannel().force(true);
// 強制文件數據落盤,不關心元數據是否落盤
outputStream.getChannel().force(false);
複製代碼
使用內存映射緩衝區(Memory-Mapped-Buffer
)來操做文件,它比普通的IO操做讀文件要快得多。由於,使用內存映射緩衝區操做文件時,並無顯式的進行相關係統調用(read
,write
),並且在必定條件下,OS
還會自動緩存一些文件頁(memory page
)。 經過zerocopy
能夠提升IO
密集型的JAVA
應用程序的性能。IO
操做須要數據頻繁地在內核緩衝區和用戶緩衝區之間拷貝,而經過zerocopy
能夠減小這種拷貝的次數,同時也下降了上下文切換(用戶態與內核態之間的切換)的次數。 咱們大多數WEB應用程序執行的一個操做流程就是:接受用戶請求-->從本地硬盤讀數據-->數據進入內核緩衝區-->用戶緩衝區-->內核緩衝區-->用戶緩衝區-->經過socket
發送。 數據每次在內核緩衝區與用戶緩衝區之間的拷貝都會消耗CPU
以及內存的帶寬。而經過zerocopy
能夠有效減小這種拷貝次數。 這裏,咱們來以文件服務器的數據傳輸爲例來分析下整個流程:從服務器硬盤中讀文件,並把文件經過網絡(socket
)發送給客戶端,寫成代碼的話,其實核心就兩句話:
File.read(fileDesc, buffer, len);
Socket.send(socket, buffer, len);
複製代碼
也就兩步操做。第一步:將文件讀入buffer
;第二步:將buffer
中的數據經過socket
發送出去。可是,這兩步操做須要四次上下文切換(也就是用戶態與內核態之間的切換)和四次copy
操做才能完成。整個過程以下圖所示:
第一次上下文切換髮生在 read()
方法執行,表示服務器要去硬盤上讀文件了,這會觸發一個sys_read()
的系統調用。此時由用戶態切換到內核態,完成的動做是:DMA
把硬盤上的數據讀入到內核緩衝區中(第一次拷貝)。
第二次上下文切換髮生在read()
方法的返回(read()
是一個阻塞調用),表示數據已經成功從硬盤上讀到內核緩衝區了。此時,由內核態返回到用戶態,完成的動做是:將內核緩衝區中的數據拷貝到用戶緩衝區(第二次拷貝)。
第三次上下文切換髮生在send()
方法執行,表示服務器準備把數據發送出去。此時,由用戶態切換到內核態,完成的動做是:將用戶緩衝區中的數據拷貝到內核緩衝區(第三次拷貝)
第四次上下文切換髮生在send()
方法的返回,這裏的send()
方法能夠異步返回:線程執行了send()
以後當即從send()
返回,剩下的數據拷貝及發送就交給操做系統底層實現了。此時,由內核態返回到用戶態,完成的動做是:將內核緩衝區中的數據送到NIC Buffer
。(第四次拷貝)
爲何須要內核緩衝區?由於內核緩衝區提升了性能。經過前面的學習可知,正是由於引入了內核緩衝區(中間緩衝區),使得數據來回地拷貝,下降了效率。那爲何又說內核緩衝區提升了性能?
對於讀操做而言,內核緩衝區就至關於一個預讀緩存,當用戶程序一次只須要讀一小部分數據時,首先操做系統會從硬盤上讀一大塊數據到內核緩衝區,用戶程序只取走了一小部分( 好比我只new byte[128]
這樣一個小的字節數組來讀)。當用戶程序下一次再讀數據,就能夠直接從內核緩衝區中取了,操做系統就不須要再次訪問硬盤了!由於用戶要讀的數據已經在內核緩衝區中!這也是前面提到的:爲何後續的讀操做(read()
方法調用)要明顯地比第一次快的緣由。從這個角度而言,內核緩衝區確實提升了讀操做的性能。
再來看寫操做:能夠作到 「異步寫」。所謂的異步,就是在wirte(dest[])
時,用戶程序告訴操做系統,把dest[]
數組中的內容寫到XXX文件
中去,而後write
方法就返回了。操做系統則在後臺默默地將用戶緩衝區中的內容(dest[]
)拷貝到內核緩衝區,再把內核緩衝區中的數據寫入硬盤。那麼,只要內核緩衝區未滿,用戶的write
操做就能夠很快地返回。這就是所謂的異步刷盤策略。
講到copy,在jdk7引入了java.nio.file.Files
這個類,方便了不少文件操做,可是它更多應用於小文件的傳輸,不適合大文件,針對後者,應該使用java.nio.channels.FileChannel
類下的transferTo
,transferFrom
方法。 這裏,咱們來分析下transferTo
方法細節,源碼以下:
public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
ensureOpen();
if (!target.isOpen())
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
long sz = size();
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);
long n;
// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
// Slow path for untrusted targets
return transferToArbitraryChannel(position, icount, target);
}
複製代碼
這裏使用了三種不一樣的方式來嘗試去拷貝文件,咱們先來看transferToDirectly
:
//sun.nio.ch.FileChannelImpl#transferToDirectly
private long transferToDirectly(long position, int icount, WritableByteChannel target) throws IOException {
if (!transferSupported)
return IOStatus.UNSUPPORTED;
FileDescriptor targetFD = null;
if (target instanceof FileChannelImpl) {
if (!fileSupported)
return IOStatus.UNSUPPORTED_CASE;
targetFD = ((FileChannelImpl)target).fd;
} else if (target instanceof SelChImpl) {
// Direct transfer to pipe causes EINVAL on some configurations
if ((target instanceof SinkChannelImpl) && !pipeSupported)
return IOStatus.UNSUPPORTED_CASE;
// Platform-specific restrictions. Now there is only one:
// Direct transfer to non-blocking channel could be forbidden
SelectableChannel sc = (SelectableChannel)target;
if (!nd.canTransferToDirectly(sc))
return IOStatus.UNSUPPORTED_CASE;
targetFD = ((SelChImpl)target).getFD();
}
if (targetFD == null)
return IOStatus.UNSUPPORTED;
int thisFDVal = IOUtil.fdVal(fd);
int targetFDVal = IOUtil.fdVal(targetFD);
if (thisFDVal == targetFDVal) // Not supported on some configurations
return IOStatus.UNSUPPORTED;
if (nd.transferToDirectlyNeedsPositionLock()) {
synchronized (positionLock) {
long pos = position();
try {
return transferToDirectlyInternal(position, icount,
target, targetFD);
} finally {
position(pos);
}
}
} else {
return transferToDirectlyInternal(position, icount, target, targetFD);
}
}
複製代碼
這個方法中的不少細節咱們都已經接觸過了,你們能夠借這個方法的細節回顧下前面的知識,這裏,直奔主題,來查看transferToDirectlyInternal
的細節:
//sun.nio.ch.FileChannelImpl#transferToDirectlyInternal
private long transferToDirectlyInternal(long position, int icount, WritableByteChannel target, FileDescriptor targetFD) throws IOException {
assert !nd.transferToDirectlyNeedsPositionLock() ||
Thread.holdsLock(positionLock);
long n = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = transferTo0(fd, position, icount, targetFD);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
pipeSupported = false;
if (target instanceof FileChannelImpl)
fileSupported = false;
return IOStatus.UNSUPPORTED_CASE;
}
if (n == IOStatus.UNSUPPORTED) {
// Don't bother trying again
transferSupported = false;
return IOStatus.UNSUPPORTED;
}
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end (n > -1);
}
}
複製代碼
能夠看到,transferToDirectlyInternal
最後調用的是transferTo0
,咱們只看其在Linux
下的實現:
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
jobject srcFDO,
jlong position, jlong count,
jobject dstFDO)
{
jint srcFD = fdval(env, srcFDO);
jint dstFD = fdval(env, dstFDO);
#if defined(__linux__)
off64_t offset = (off64_t)position;
jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
if (n < 0) {
if (errno == EAGAIN)
return IOS_UNAVAILABLE;
if ((errno == EINVAL) && ((ssize_t)count >= 0))
return IOS_UNSUPPORTED_CASE;
if (errno == EINTR) {
return IOS_INTERRUPTED;
}
JNU_ThrowIOExceptionWithLastError(env, "Transfer failed");
return IOS_THROWN;
}
return n;
....
}
複製代碼
這裏咱們能夠看到使用是sendfile
的調用,這裏咱們經過一張圖來解讀這個動做:
在發生sendfile
調用後,數據首先經過DMA
從硬件設備(此處是硬盤)讀取到內核空間,而後將內核空間數據拷貝到socket buffer
,以後socket buffer
數據拷貝到協議引擎(好比咱們經常使用的網卡,也就是以前涉及到的NIC
)寫到服務器端。這裏減去了傳統IO
在內核和用戶之間的拷貝,可是內核裏邊的拷貝仍是存在。 咱們將以前以文件服務器的數據傳輸爲例所畫的四次拷貝操作圖作相應的改進,以下:
咱們對transferTo()
進行總結下,當此方法被調用時,會由用戶態切換到內核態。所進行的動做:DMA將數據從磁盤讀入 Read buffer
中(第一次數據拷貝)。接着,依然在內核空間中,將數據從Read buffer
拷貝到 Socket buffer
(第二次數據拷貝),最終再將數據從Socket buffer
拷貝到NIC buffer
(第三次數據拷貝)。最後,再從內核態返回到用戶態。 上面整個過程涉及到三次數據拷貝和二次上下文切換。直觀上感受也就減小了一次數據拷貝。但這裏已經不涉及用戶空間的緩衝區了。 並且,在這三次數據拷貝中,只有在第2次拷貝時須要到CPU的干預。可是前面的傳統數據拷貝須要四次且有三次拷貝須要CPU的干預。
而在Linux2.4之後的版本又有了改善:
socket buffer
在這裏不是一個緩衝區了,而是一個文件描述符,描述的是數據在內核緩衝區的數據從哪裏開始,長度是多少,裏面基本上不存儲數據,大部分是指針,而後協議引擎protocol engine
(這裏是NIC
)也是經過DMA
拷貝的方式從文件描述符讀取。 也就是說用戶程序執行transferTo()
方法後,致使一次系統調用,從用戶態切換到內核態。內在會經過DMA
將數據從磁盤中拷貝到Read buffer
。用一個文件描述符標記這次待傳輸數據的地址以及長度,DMA
直接把數據從Read buffer
傳輸到NIC buffer
。數據拷貝過程都不用CPU
干預了。這裏一共只有兩次拷貝和兩次上下文切換。
參考文章:Efficient data transfer through zero copy
最後,咱們再來看下sun.nio.ch.FileChannelImpl#transferTo
涉及的其餘兩種拷貝方式transferToTrustedChannel
與transferToArbitraryChannel
,先來看前者的相關源碼:
// Maximum size to map when using a mapped buffer
private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;
//sun.nio.ch.FileChannelImpl#transferToTrustedChannel
private long transferToTrustedChannel(long position, long count, WritableByteChannel target) throws IOException {
boolean isSelChImpl = (target instanceof SelChImpl);
if (!((target instanceof FileChannelImpl) || isSelChImpl))
return IOStatus.UNSUPPORTED;
// Trusted target: Use a mapped buffer
long remaining = count;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
try {
MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
try {
// ## Bug: Closing this channel will not terminate the write
int n = target.write(dbb);
assert n >= 0;
remaining -= n;
if (isSelChImpl) {
// one attempt to write to selectable channel
break;
}
assert n > 0;
position += n;
} finally {
unmap(dbb);
}
} catch (ClosedByInterruptException e) {
...
} catch (IOException ioe) {
...
}
}
return count - remaining;
}
複製代碼
能夠看到transferToTrustedChannel
是經過mmap
來拷貝數據,每次最大傳輸8m(MappedByteBuffer
緩衝區大小)。而transferToArbitraryChannel
一次分配的DirectBuffer
最大值爲8192:
private static final int TRANSFER_SIZE = 8192;
//sun.nio.ch.FileChannelImpl#transferToArbitraryChannel
private long transferToArbitraryChannel(long position, int icount, WritableByteChannel target) throws IOException {
// Untrusted target: Use a newly-erased buffer
int c = Math.min(icount, TRANSFER_SIZE);
// Util.getTemporaryDirectBuffer獲得的是DirectBuffer
ByteBuffer bb = Util.getTemporaryDirectBuffer(c);
long tw = 0; // Total bytes written
long pos = position;
try {
Util.erase(bb);
while (tw < icount) {
bb.limit(Math.min((int)(icount - tw), TRANSFER_SIZE));
int nr = read(bb, pos);
if (nr <= 0)
break;
bb.flip();
// ## Bug: Will block writing target if this channel
// ## is asynchronously closed
int nw = target.write(bb);
tw += nw;
if (nw != nr)
break;
pos += nw;
bb.clear();
}
return tw;
} catch (IOException x) {
if (tw > 0)
return tw;
throw x;
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
複製代碼
上面所示代碼最重要的邏輯無非就是read(bb, pos)
和target.write(bb)
。這裏,咱們只看前者:
//sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer, long)
public int read(ByteBuffer dst, long position) throws IOException {
if (dst == null)
throw new NullPointerException();
if (position < 0)
throw new IllegalArgumentException("Negative position");
if (!readable)
throw new NonReadableChannelException();
if (direct)
Util.checkChannelPositionAligned(position, alignment);
ensureOpen();
if (nd.needsPositionLock()) {
synchronized (positionLock) {
return readInternal(dst, position);
}
} else {
return readInternal(dst, position);
}
}
//sun.nio.ch.FileChannelImpl#readInternal
private int readInternal(ByteBuffer dst, long position) throws IOException {
assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
int n = 0;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = IOUtil.read(fd, dst, position, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
複製代碼
由上可知,最後調用了IOUtil.read
,再往下追源碼,也就是調用了sun.nio.ch.IOUtil#readIntoNativeBuffer
,最後調用的就是底層的read
和pread
。一樣,target.write(bb)
最後也是pwrite
和write
的系統調用,會佔用cpu
資源的。
最後,咱們來思考下,當須要傳輸的數據遠遠大於內核緩衝區的大小時,內核緩衝區就會成爲瓶頸。此時內核緩衝區已經起不到「緩衝」的功能了,畢竟傳輸的數據量太大了,這也是爲何在進行大文件傳輸時更適合使用零拷貝來進行。
本文從操做系統級別開始講解IO底層實現原理,分析了IO底層實現細節的一些優缺點,同時對Java NIO
中的DirectBufferd
以及MappedByteBuffer
進行了詳細解讀。最後在以前的基礎上結合源碼闡述了zerocopy
技術的實現原理。
本文部分插圖來源互聯網,版權屬於原做者。如有不妥之處,請留言告知,感謝!