天不生仲尼,萬古如長夜。在計算機科學中,也有一個劃時代的發明,B樹(多路平衡查找樹)及其變體(B樹,b*樹,b+樹);html
由德國科學家(魯道夫·拜爾 Rudolf Bayer),美國科學家(愛德華·M·麥克特 Edward Meyers McCreight)於1970年共同發明;node
B樹這種數據結構特別適合用於數據庫與文件系統設計中,是人類精神財富的精華部分,B樹不誕生,計算機在處理大數據量計算時會變得很是困難。mysql
基本上都是軟件產品最底層的,最核心的功能。程序員
如:各類操做系統(windows,Linux,Mac)的文件系統索引,各類數據庫(sqlserver、oracle、mysql、MongoDB、等等),算法
基本上大部分與大數據量讀取有關的事務,多少都與B樹家族有關,由於B樹的優勢太明顯,特別是讀取磁盤數據效率很是的高效,sql
查找效率O(log n),甚至在B+樹中查詢速度恆定,不管多少存儲多少數據,查詢任何一個速度都同樣。簡直就是天才的發明。數據庫
在上世紀時期,計算機內存儲器都很是的小,以KB爲單位,比起如今動不動以G計算,簡直小的可憐。windows
計算機運算數據時,數據是在內存中進行操做的,好比一些加減乘除、正刪改查等。數組
舉個簡單的栗子:從一個數組 int a[1,2,3,4,5,6,7,8,9]中找出3,那很是簡單;大概步驟以下:數據結構
一、在內存中初始化這個數組
二、獲取數組指針遍歷這個數組,查到3就完成
可是這個數組很大,好比包含1億個數字怎麼辦?若是數組容量大大超過內存大小,那這種比較就不現實了。如今的作法都是把文件
數據存放在外存儲器,好比磁盤,U盤,光盤;而後把文件分屢次的拷貝數據至內存進行操做。可是讀取外存儲器效率對比讀取內存,
差距是很是大的,通常是百萬級別的差距,差6個數量級,因此這個問題不解決一切都是空談。
好在操做系統在設計之初,就對讀取外存儲器進行了必定的優化,引入了「邏輯塊」概念,當作操做文件的最小單元,而B樹合理地利用這個「邏輯塊」
功能開發的高效存儲數據結構;在介紹B樹特性以前,先來了解一下磁盤的基本工做原理。
1)磁盤結構介紹
網上引用的兩張圖,將就看看,基本結構是:磁盤 > 盤面 > 磁道 > 扇區
左邊是物理圖,這個你們應該都是常常見到了,通常圓形的那部分有不少層,每一層叫盤片;右邊的是示意圖,表明左圖的一個盤面。
每一個盤面有跟多環形的磁道,每一個磁道有若干段扇區組成,扇區是磁盤的最小組成單元,若干段扇區組成簇(也叫磁盤塊、邏輯塊等)
先看看我電腦的磁盤簇與扇區大小
能夠看到個人E盤每一個扇區512個字節,每一個簇4096字節,這個先記下來,後邊有用到
扇區是磁盤組成的最小單元,簇是虛擬出來的,主要是爲了操做系統方便讀寫磁盤;因爲扇區比較小,數量很是多,
在尋址比較麻煩,操做系統就將相鄰的幾個扇區組合在一塊兒,造成簇,再以簇爲每次操做文件的最小單元。好比加載一個磁盤文件內容,
操做系統是分批次讀取,每次只拷貝一個簇的單位數據,個人電腦就是一次拷貝4096字節,知道文件所有拷貝完成。
2)讀寫速度
磁盤讀取時間是毫秒級別的通常幾毫秒到十幾毫秒之間,這個跟磁盤轉速有點關係,還有就是數據所在磁道遠近有關係;
CPU處理時間是納秒級別,毫秒:納秒 = 1:1000000,因此在程序設計中,讀取文件是時間成本很是高的,應該儘可能合理設計;
B樹(英語:B-tree)是一種自平衡的樹,可以保持數據有序。這種數據結構可以讓查找數據、順序訪問、插入數據及刪除的動做,
都在對數時間內完成。B樹,歸納來講是一個通常化的二叉查找樹(binary search tree)一個節點能夠擁有最少2個子節點。
與自平衡二叉查找樹不一樣,B樹適用於讀寫相對大的數據塊的存儲系統,例如磁盤。B樹減小定位記錄時所經歷的中間過程,從而加快存取速度。
B樹這種數據結構能夠用來描述外部存儲。這種數據結構常被應用在數據庫和文件系統的實現上。
一個 m 階的B樹是一個有如下特性:
好吧,上邊這一段看了等於沒看的定義能夠不看,這裏有個重要的B樹特性須要瞭解,就是B樹的階,對於階的定義國內外是有分歧的,有的定義爲度。
階指的是節點的最大孩子數,度指的是節點的最小孩子數,我查閱了不少資料,基本上能夠理解爲:
1度 = 2階,好比說3度B樹,能夠理解爲6階B樹。這點有些疑問,有更好的說法的能夠留言討論一下。
1)內部節點:
內部節點是除葉子節點和根節點以外的全部節點。每一個內部節點擁有最多 U 個,最少 L 個子節點。元素的數量老是比子節點指針的數量少1。
U 必須等於 2L 或者 2L-1。這個L通常是度數。
2)根節點:根節點擁有的子節點數量的上限和內部節點相同,可是沒有下限。
3)葉子節點:葉子節點對元素的數量有相同的限制,可是沒有子節點,也沒有指向子節點的指針。
4)爲了分析方便舉例3階3層B樹
圖1
從上圖中能夠得出如下幾個信息:
數據檢索分析:
依據上圖分析,由於整棵樹已經在內存中,至關於一個變量,數據檢索首先是從根節點開始;
1)若是要查詢9,首先從根節點比較,那比較一次就獲得結果,
2)若是要查詢第二層的三、4,首先判斷根節點鍵值,沒有匹配到,可是能夠判斷要檢索的鍵值比根節點小,
因此接下來是從左孩子樹繼續檢索,十二、15也是相似,總共須要2次比較就獲得結果
3)若是查詢葉子節點鍵值,相似2),只須要3次比較就能獲得結果。
4)對比普通的數組遍歷查詢,B樹檢索的時間成本沒有隨數據量增長而線性增長,效率大大提升。
前面已經提到,若是樹已經在內存中,那固然好辦,直接遍歷就行了。若是B樹僅僅如此,那也和數組差異不大,一樣受限於內存大小;
因此,在內存中建立整棵B樹是不現實的,這不是B樹的正確打開方式。
前面也已經提到,操做系統加載磁盤文件的時候,若是文件超過簇大小(即4096個字節),那會分屢次的讀取磁盤,直到拷貝數據完成。
這裏看似一個加載動做,其實這個動做包含了N次磁盤尋址,而咱們已經知道,每次磁盤尋址直至拷貝數據開銷是很是大的;是CPU指令耗時百萬倍以上;
這種操做應該儘可能少地執行,而B樹這種數據結構就是爲了解決磁盤讀取瓶頸這個問題而產生的。
實際應用中,B樹會持久化到磁盤,而後只在內存保留一個根節點的指針。已上圖1爲例:
每一個節點大小恰好等於簇大小,這樣只需一次磁盤IO就能夠獲取到一整個節點的全部鍵值,及其全部子樹的指針。
好比,查詢鍵值8:
1)第一步,讀取根節點獲得鍵值9,以及2個子樹指針,分別指向左右孩子節點,由於9 > 8,因此下一步加載左孩子節點
2)第二部,加載節點2,獲得鍵值三、6,以及3個子樹指針,由於三、6 < 8,因此下一步要加載節點2的右孩子節點
3)第三部,加載節點6,獲得鍵值七、8,由於是葉子節點因此沒有子樹指針,遍歷鍵值匹配到8,返回。
總結:
在這個3階3層的B樹中,不管查找哪個鍵值,最多隻須要3次磁盤操做,就算平均每次耗時10毫秒,總共須要耗時30毫秒(CPU運算耗時能夠忽略);
以此類推,3階4層的B樹,須要讀取4次磁盤,耗時40毫秒,5層50毫秒,6層60毫秒,7層,8層,,,,
這樣一看貌似也沒什麼,幾十毫秒已經不能說快了,可是別忘了咱們這顆樹只有3階,即一個節點保存2個鍵值。一個簇最多能有4096/4=1024個鍵值;
若是建立一個1024階的B樹,分別控制在三、四、5層的話,根據B樹高度公式:,H爲層數,T爲1024,n爲數據總數
耗時以下:
3階3層:能容納2147483648(20億)個鍵值,檢索耗時也將30毫秒內
3階4層:能容納2147483648(20億) ~ 2199023255552(2兆億)個鍵值,檢索耗時也將40毫秒內,固然這已經超出鍵值表達範圍了
3階5層:難以想象。。。
固然實際運用當中達不到1024階,由於樹持久化到磁盤時,索引結構體通常都是超過4個字節,好比12個字節,那一個簇最多能有4096/12=341個鍵值。
若是階數按341來算:
3階3層:能容納79303642(7千萬)個鍵值,檢索耗時也將30毫秒內
3階4層:能容納79303642(7千萬) ~ 27042541922(200億)個鍵值,檢索耗時也將40毫秒內
也是很是多了。。
1)首先,咱們把B樹基本信息定義出來
1 public class Consts 2 { 3 public const int M = 3; // B樹的最小度數 4 public const int KeyMax = 2 * M - 1; // 節點包含關鍵字的最大個數 5 public const int KeyMin = M - 1; // 非根節點包含關鍵字的最小個數 6 public const int ChildMax = KeyMax + 1; // 孩子節點的最大個數 7 public const int ChildMin = KeyMin + 1; // 孩子節點的最小個數 8 }
先寫個簡單的demo,由於最小度數爲3,那就是6階。先實現幾個簡單的方法,新增,拆分,其他的合併,刪除比較複雜之後有機會再看看
2)定義BTreeNode,B樹節點
1 public class BTreeNode 2 { 3 private bool leaf; 4 public int[] keys; 5 public int keyNumber; 6 public BTreeNode[] children; 7 public int blockIndex; 8 public int dataIndex; 9 10 public BTreeNode(bool leaf) 11 { 12 this.leaf = leaf; 13 keys = new int[Consts.KeyMax]; 14 children = new BTreeNode[Consts.ChildMax]; 15 } 16 17 /// <summary>在未滿的節點中插入鍵值</summary> 18 /// <param name="key">鍵值</param> 19 public void InsertNonFull(int key) 20 { 21 var index = keyNumber - 1; 22 23 if (leaf == true) 24 { 25 // 找到合適位置,而且移動節點鍵值騰出位置 26 while (index >= 0 && keys[index] > key) 27 { 28 keys[index + 1] = keys[index]; 29 index--; 30 } 31 32 // 在index後邊新增鍵值 33 keys[index + 1] = key; 34 keyNumber = keyNumber + 1; 35 } 36 else 37 { 38 // 找到合適的子孩子索引 39 while (index >= 0 && keys[index] > key) index--; 40 41 // 若是孩子節點已滿 42 if (children[index + 1].keyNumber == Consts.KeyMax) 43 { 44 // 分裂該孩子節點 45 SplitChild(index + 1, children[index + 1]); 46 47 // 分裂後中間節點上跳父節點 48 // 孩子節點已經分裂成2個節點,找到合適的一個 49 if (keys[index + 1] < key) index++; 50 } 51 52 // 插入鍵值 53 children[index + 1].InsertNonFull(key); 54 } 55 } 56 57 /// <summary>分裂節點</summary> 58 /// <param name="childIndex">孩子節點索引</param> 59 /// <param name="waitSplitNode">待分裂節點</param> 60 public void SplitChild(int childIndex, BTreeNode waitSplitNode) 61 { 62 var newNode = new BTreeNode(waitSplitNode.leaf); 63 newNode.keyNumber = Consts.KeyMin; 64 65 // 把待分裂的節點中的通常節點搬到新節點 66 for (var j = 0; j < Consts.KeyMin; j++) 67 { 68 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin]; 69 70 // 清0 71 waitSplitNode.keys[j + Consts.ChildMin] = 0; 72 } 73 74 // 若是待分裂節點不是也只節點 75 if (waitSplitNode.leaf == false) 76 { 77 for (var j = 0; j < Consts.ChildMin; j++) 78 { 79 // 把孩子節點也搬過去 80 newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin]; 81 82 // 清0 83 waitSplitNode.children[j + Consts.ChildMin] = null; 84 } 85 } 86 87 waitSplitNode.keyNumber = Consts.KeyMin; 88 89 // 拷貝通常鍵值到新節點 90 for (var j = keyNumber; j >= childIndex + 1; j--) 91 children[j + 1] = children[j]; 92 93 children[childIndex + 1] = newNode; 94 for (var j = keyNumber - 1; j >= childIndex; j--) 95 keys[j + 1] = keys[j]; 96 97 // 把中間鍵值上跳至父節點 98 keys[childIndex] = waitSplitNode.keys[Consts.KeyMin]; 99 100 // 清0 101 waitSplitNode.keys[Consts.KeyMin] = 0; 102 103 // 根節點鍵值數自加 104 keyNumber = keyNumber + 1; 105 } 106 107 /// <summary>根據節點索引順序打印節點鍵值</summary> 108 public void PrintByIndex() 109 { 110 int index; 111 for (index = 0; index < keyNumber; index++) 112 { 113 // 若是不是葉子節點, 先打印葉子子節點. 114 if (leaf == false) children[index].PrintByIndex(); 115 116 Console.Write("{0} ", keys[index]); 117 } 118 119 // 打印孩子節點 120 if (leaf == false) children[index].PrintByIndex(); 121 } 122 123 /// <summary>查找某鍵值是否已經存在樹中</summary> 124 /// <param name="key">鍵值</param> 125 /// <returns></returns> 126 public BTreeNode Find(int key) 127 { 128 int index = 0; 129 while (index < keyNumber && key > keys[index]) index++; 130 131 // 該key已經存在, 返回該索引位置節點 132 if (keys[index] == key) return this; 133 134 // key 不存在,而且節點是葉子節點 135 if (leaf == true) return null; 136 137 // 遞歸在孩子節點中查找 138 return children[index].Find(key); 139 } 140 }
3)B樹模型
1 public class BTree 2 { 3 public BTreeNode Root { get; private set; } 4 5 public BTree() { } 6 7 /// <summary>根據節點索引順序打印節點鍵值</summary> 8 public void PrintByIndex() 9 { 10 if (Root == null) 11 { 12 Console.WriteLine("空樹"); 13 return; 14 } 15 16 Root.PrintByIndex(); 17 } 18 19 /// <summary>查找某鍵值是否已經存在樹中</summary> 20 /// <param name="key">鍵值</param> 21 /// <returns></returns> 22 public BTreeNode Find(int key) 23 { 24 if (Root == null) return null; 25 26 return Root.Find(key); 27 } 28 29 /// <summary>新增B樹節點鍵值</summary> 30 /// <param name="key">鍵值</param> 31 public void Insert(int key) 32 { 33 if (Root == null) 34 { 35 Root = new BTreeNode(true); 36 Root.keys[0] = key; 37 Root.keyNumber = 1; 38 return; 39 } 40 41 if (Root.keyNumber == Consts.KeyMax) 42 { 43 var newNode = new BTreeNode(false); 44 45 newNode.children[0] = Root; 46 newNode.SplitChild(0, Root); 47 48 var index = 0; 49 if (newNode.keys[0] < key) index++; 50 51 newNode.children[index].InsertNonFull(key); 52 Root = newNode; 53 } 54 else 55 { 56 Root.InsertNonFull(key); 57 } 58 } 59 }
4)新增20個無序鍵值,測試一下
1 var bTree = new BTree(); 2 3 bTree.Insert(4); 4 bTree.Insert(5); 5 bTree.Insert(6); 6 bTree.Insert(1); 7 bTree.Insert(2); 8 bTree.Insert(3); 9 bTree.Insert(10); 10 bTree.Insert(11); 11 bTree.Insert(12); 12 bTree.Insert(7); 13 bTree.Insert(8); 14 bTree.Insert(9); 15 bTree.Insert(13); 16 bTree.Insert(14); 17 bTree.Insert(18); 18 bTree.Insert(19); 19 bTree.Insert(20); 20 bTree.Insert(15); 21 bTree.Insert(16); 22 bTree.Insert(17); 23 24 Console.WriteLine("輸出排序後鍵值"); 25 bTree.PrintByIndex();
5)運行
上文提到,B數不可能只存在內存而沒法落地,那樣沒有意義。因此就須要將整棵樹持久化到磁盤文件,而且還要支持快速地從磁盤文件中檢索到鍵值;
要持久化就要考慮不少問題,像上邊的簡單示例是沒有實際意義的,由於節點不可能只有鍵值與孩子樹,還得有數據指針,存儲位置等等,大概有如下一些問題:
問題比較多,很是麻煩。具體的過程就不列舉了,如下展現如下修改後的B樹模型。
一、先定義一個結構體
1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)] 2 public struct BlockItem 3 { 4 public int ChildBlockIndex; 5 public int Key; 6 public int DataIndex; 7 8 public BlockItem(int key, int dataIndex) 9 { 10 ChildBlockIndex = -1; 11 Key = key; 12 DataIndex = dataIndex; 13 } 14 }
結構體總共12字節,爲了可以持久化整棵B樹到磁盤,加入了ChildBlockIndex子孩子節點塊索引,根據這個塊索引在下一次重建子孩子樹層級關係時就知道從
文件的那個位置開始讀取;Key鍵值,DataIndex數據索引,數據索引也是一個文件位置記錄,跟ChildBlockIndex差很少,這樣檢索到key後就知道從
文件哪一個位置獲取真正的數據。爲了更形象瞭解B樹應用,我畫了一個結構體的示意圖:
0、總共3個節點,每一個節點由N個結構體組成,最末尾只有孩子指針,沒有數據與鍵值
一、黃色爲子樹塊索引,即ChildBlockIndex,指向這個子孩子樹全部數據在文件中的位置
二、紅色爲鍵值,即Key,鍵值通常是惟一的,不容許重複
三、藍色爲數據塊索引,即DataIndex,指向鍵值對應的數據在文件中的什麼位置開始,而後讀取一個結構體的長度便可
四、底下綠色的一塊是數據指針指向的具體數據塊
二、數據結構體
1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)] 2 public struct SDataTest 3 { 4 public int Idx; 5 public int Age; 6 public byte Sex; 7 8 [MarshalAs(UnmanagedType.ByValArray, SizeConst = 20)] 9 public byte[] Name; 10 11 public byte Valid; 12 };
三、B樹節點類修改改一下,這個就不解釋了,複習一下程序員基本功,啃代碼。
1 public class BTreeNode 2 { 3 private BTree tree; 4 private bool leaf; 5 6 public int keyNumber; 7 public BlockItem[] keys; 8 public BTreeNode[] children; 9 10 public int blockIndex; 11 public int findIndex; 12 13 public BTreeNode(BTree tree, bool leaf) 14 { 15 this.tree = tree; 16 this.leaf = leaf; 17 keys = new BlockItem[Consts.KeyMax]; 18 children = new BTreeNode[Consts.ChildMax]; 19 blockIndex = Consts.BlockIndex++; 20 } 21 22 /// <summary>在未滿的節點中插入鍵值</summary> 23 /// <param name="key">鍵值</param> 24 public void InsertNonFull(BlockItem item) 25 { 26 var index = keyNumber - 1; 27 28 if (leaf == true) 29 { 30 // 找到合適位置,而且移動節點鍵值騰出位置 31 while (index >= 0 && keys[index].Key > item.Key) 32 { 33 keys[index + 1] = keys[index]; 34 index--; 35 } 36 37 // 在index後邊新增鍵值 38 keys[index + 1] = item; 39 keyNumber = keyNumber + 1; 40 } 41 else 42 { 43 // 找到合適的子孩子索引 44 while (index >= 0 && keys[index].Key > item.Key) index--; 45 46 // 若是孩子節點已滿 47 if (children[index + 1].keyNumber == Consts.KeyMax) 48 { 49 // 分裂該孩子節點 50 SplitChild(index + 1, children[index + 1]); 51 52 // 分裂後中間節點上跳父節點 53 // 孩子節點已經分裂成2個節點,找到合適的一個 54 if (keys[index + 1].Key < item.Key) index++; 55 } 56 57 // 插入鍵值 58 children[index + 1].InsertNonFull(item); 59 } 60 } 61 62 /// <summary>分裂節點</summary> 63 /// <param name="childIndex">孩子節點索引</param> 64 /// <param name="waitSplitNode">待分裂節點</param> 65 public void SplitChild(int childIndex, BTreeNode waitSplitNode) 66 { 67 var newNode = new BTreeNode(tree, waitSplitNode.leaf); 68 newNode.keyNumber = Consts.KeyMin; 69 70 // 把待分裂的節點中的通常節點搬到新節點 71 for (var j = 0; j < Consts.KeyMin; j++) 72 { 73 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin]; 74 75 // 清0 76 waitSplitNode.keys[j + Consts.ChildMin] = default(BlockItem); 77 } 78 79 // 若是待分裂節點不是也只節點 80 if (waitSplitNode.leaf == false) 81 { 82 for (var j = 0; j < Consts.ChildMin; j++) 83 { 84 // 把孩子節點也搬過去 85 newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin]; 86 87 // 清0 88 waitSplitNode.children[j + Consts.ChildMin] = null; 89 } 90 } 91 92 waitSplitNode.keyNumber = Consts.KeyMin; 93 94 for (var j = keyNumber; j >= childIndex + 1; j--) 95 children[j + 1] = children[j]; 96 97 children[childIndex + 1] = newNode; 98 99 for (var j = keyNumber - 1; j >= childIndex; j--) 100 keys[j + 1] = keys[j]; 101 102 // 把中間鍵值上跳至父節點 103 keys[childIndex] = waitSplitNode.keys[Consts.KeyMin]; 104 105 // 清0 106 waitSplitNode.keys[Consts.KeyMin] = default(BlockItem); 107 108 // 根節點鍵值數自加 109 keyNumber = keyNumber + 1; 110 } 111 112 /// <summary>根據節點索引順序打印節點鍵值</summary> 113 public void PrintByIndex() 114 { 115 int index; 116 for (index = 0; index < keyNumber; index++) 117 { 118 // 若是不是葉子節點, 先打印葉子子節點. 119 if (leaf == false) children[index].PrintByIndex(); 120 121 Console.Write("{0} ", keys[index].Key); 122 } 123 124 // 打印孩子節點 125 if (leaf == false) children[index].PrintByIndex(); 126 } 127 128 /// <summary>查找某鍵值是否已經存在樹中</summary> 129 /// <param name="item">鍵值</param> 130 /// <returns></returns> 131 public BTreeNode Find(BlockItem item) 132 { 133 findIndex = 0; 134 int index = 0; 135 while (index < keyNumber && item.Key > keys[index].Key) index++; 136 137 // 遍歷所有都未找到,索引計數減1 138 if (index > 0 && index == keyNumber) index--; 139 140 // 該key已經存在, 返回該索引位置節點 141 if (keys[index].Key == item.Key) 142 { 143 findIndex = index; 144 return this; 145 } 146 147 // key 不存在,而且節點是葉子節點 148 if (leaf == true) return null; 149 150 // 重建children[index]數據結構 151 var childBlockIndex = keys[index].ChildBlockIndex; 152 tree.LoadNodeByBlock(ref children[index], childBlockIndex); 153 154 // 遞歸在孩子節點中查找 155 if (children[index] == null) return null; 156 return children[index].Find(item); 157 } 158 }
四、B樹模型也要修改一下 ,不解釋
1 public class BTree 2 { 3 private FileStream rwFS; 4 5 public BTreeNode Root; 6 7 public BTree(string fullName) 8 { 9 rwFS = new FileStream(fullName, FileMode.OpenOrCreate, FileAccess.ReadWrite); 10 11 // 建立10M的空間,用作索引存儲 12 if (rwFS.Length == 0) 13 { 14 rwFS.SetLength(Consts.IndexTotalSize); 15 } 16 17 // 從數據文件重建根節點,內存只保存根節點 18 LoadNodeByBlock(ref Root, 0); 19 } 20 21 public void LoadNodeByBlock(ref BTreeNode node, int blockIndex) 22 { 23 var items = Helper.Read(rwFS,blockIndex); 24 if (items.Count > 0) 25 { 26 var isLeaf = items[0].ChildBlockIndex == Consts.NoChild; 27 28 node = new BTreeNode(this, isLeaf); 29 node.blockIndex = blockIndex; 30 node.keys = items.ToArray(); 31 node.keyNumber = items.Count; 32 } 33 } 34 35 /// <summary>根據節點索引順序打印節點鍵值</summary> 36 public void PrintByIndex() 37 { 38 if (Root == null) 39 { 40 Console.WriteLine("空樹"); 41 return; 42 } 43 44 Root.PrintByIndex(); 45 } 46 47 /// <summary>查找某鍵值是否已經存在樹中</summary> 48 /// <param name="item">鍵值</param> 49 /// <returns></returns> 50 public BTreeNode Find(BlockItem item) 51 { 52 if (Root == null) return null; 53 54 return Root.Find(item); 55 } 56 public BTreeNode Find(int key) 57 { 58 return Find(new BlockItem() { Key = key }); 59 } 60 61 /// <summary>新增B樹節點鍵值</summary> 62 /// <param name="item">鍵值</param> 63 private void Insert(BlockItem item) 64 { 65 if (Root == null) 66 { 67 Root = new BTreeNode(this, true); 68 Root.keys[0] = item; 69 Root.keyNumber = 1; 70 } 71 else 72 { 73 if (Root.keyNumber == Consts.KeyMax) 74 { 75 var newNode = new BTreeNode(this, false); 76 77 newNode.children[0] = Root; 78 newNode.SplitChild(0, Root); 79 80 var index = 0; 81 if (newNode.keys[0].Key < item.Key) index++; 82 83 newNode.children[index].InsertNonFull(item); 84 Root = newNode; 85 } 86 else 87 { 88 Root.InsertNonFull(item); 89 } 90 } 91 } 92 93 public void Insert(SDataTest data) 94 { 95 var item = new BlockItem() 96 { 97 Key = data.Idx 98 }; 99 100 var node = Find(item); 101 if (node != null) 102 { 103 Console.WriteLine("鍵值已經存在,info:{0}", item.Key); 104 return; 105 } 106 107 // 保存數據 108 item.DataIndex = Helper.InsertData(rwFS, data); 109 110 // 保存索引 111 if (item.DataIndex >= 0) 112 Insert(item); 113 } 114 115 /// <summary>持久化整棵樹</summary> 116 public void SaveIndexAll() 117 { 118 SaveIndex(Root); 119 } 120 121 /// <summary>持久化某節點如下的樹枝</summary> 122 /// <param name="node">某節點</param> 123 public void SaveIndex(BTreeNode node) 124 { 125 var bw = new BinaryWriter(rwFS); 126 var keyItem = default(BlockItem); 127 128 // 第一層 129 var nodeL1 = node; 130 if (nodeL1 == null) return; 131 132 for (var i = 0; i <= nodeL1.keyNumber; i++) 133 { 134 keyItem = default(BlockItem); 135 if (i < nodeL1.keyNumber) keyItem = nodeL1.keys[i]; 136 137 SaveIndex(bw, 0, i, nodeL1.children[i], keyItem); 138 139 // 第二層 140 var nodeL2 = nodeL1.children[i]; 141 if (nodeL2 == null) continue; 142 143 for (var j = 0; j <= nodeL2.keyNumber; j++) 144 { 145 keyItem = default(BlockItem); 146 if (j < nodeL2.keyNumber) keyItem = nodeL2.keys[j]; 147 148 SaveIndex(bw, nodeL2.blockIndex, j, nodeL2.children[j], keyItem); 149 150 // 第三層 151 var nodeL3 = nodeL2.children[j]; 152 if (nodeL3 == null) continue; 153 154 for (var k = 0; k <= nodeL3.keyNumber; k++) 155 { 156 keyItem = default(BlockItem); 157 if (k < nodeL3.keyNumber) keyItem = nodeL3.keys[k]; 158 159 SaveIndex(bw, nodeL3.blockIndex, k, nodeL3.children[k], keyItem); 160 161 // 第四層 162 var nodeL4 = nodeL3.children[k]; 163 if (nodeL4 == null) continue; 164 165 for (var l = 0; l <= nodeL4.keyNumber; l++) 166 { 167 keyItem = default(BlockItem); 168 if (l < nodeL4.keyNumber) keyItem = nodeL4.keys[l]; 169 170 SaveIndex(bw, nodeL4.blockIndex, l, nodeL4.children[l], keyItem); 171 172 // 第五層 173 var nodeL5 = nodeL4.children[l]; 174 if (nodeL5 == null) continue; 175 176 for (var z = 0; z <= nodeL5.keyNumber; z++) 177 { 178 keyItem = default(BlockItem); 179 if (z < nodeL5.keyNumber) keyItem = nodeL5.keys[z]; 180 181 SaveIndex(bw, nodeL5.blockIndex, z, nodeL5.children[z], keyItem); 182 } 183 } 184 } 185 } 186 } 187 } 188 private void SaveIndex(BinaryWriter bw, int blockIndex, int num, BTreeNode node, BlockItem item) 189 { 190 bw.Seek((blockIndex * Consts.BlockSize) + (num * Consts.IndexSize), SeekOrigin.Begin); 191 bw.Write(node == null ? Consts.NoChild : node.blockIndex); 192 bw.Write(item.Key); 193 bw.Write(item.DataIndex); 194 bw.Flush(); 195 } 196 197 public SDataTest LoadData(int dataIndex) 198 { 199 return Helper.Load(rwFS, dataIndex); 200 } 201 }
五、寫測試
1 private static void InsertTest(ref BTree bTree) 2 { 3 // 新增測試數據 4 for (int i = 1; i <= Consts.TotalKeyNumber; i++) 5 { 6 bTree.Insert(new SDataTest() 7 { 8 Idx = i, 9 Age = i, 10 Sex = 1, 11 Name = Helper.Copy("Name(" + i.ToString() + ")", 20), 12 Valid = 1 13 }); 14 } 15 16 Console.WriteLine("測試數據添加完畢,共新增{0}條數據", Consts.TotalKeyNumber); 17 }
六、讀測試
1 private static void FindTest(ref BTree bTree) 2 { 3 var count = 0; 4 5 // 校驗數據查找 6 for (int i = 1; i <= Consts.TotalKeyNumber; i++) 7 { 8 var node = bTree.Find(i); 9 if (node == null) 10 { 11 //Console.WriteLine("未找到{0}", i); 12 continue; 13 } 14 15 //Console.WriteLine("findIndex:{0},key:{1},dataIndex:{2}", node.findIndex, node.keys[node.findIndex].Key, node.keys[node.findIndex].DataIndex); 16 17 count++; 18 if (count % 10000 == 0) 19 { 20 var data = bTree.LoadData(node.keys[node.findIndex].DataIndex); 21 var name = Encoding.Default.GetString(data.Name).TrimEnd('\0'); 22 Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid); 23 } 24 } 25 26 Console.WriteLine("有效數據個數:{0}", count); 27 }
七、最後測試一下
八、測試查詢時間
1 private static void CheckLoadTime(ref BTree bTree, int key) 2 { 3 var start = DateTime.Now; 4 var node = bTree.Find(key); 5 if (node == null) return; 6 7 Console.WriteLine("查找{0},耗時:{1}", key.ToString(), (DateTime.Now - start).TotalMilliseconds.ToString()); 8 9 var data = bTree.LoadData(node.keys[node.findIndex].DataIndex); 10 var name = Encoding.Default.GetString(data.Name).TrimEnd('\0'); 11 Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid); 12 Console.WriteLine(); 13 }
1 CheckLoadTime(ref bTree, 1000); 2 CheckLoadTime(ref bTree, 10000); 3 CheckLoadTime(ref bTree, 50000); 4 CheckLoadTime(ref bTree, 100000);
九、從新生成10000000條數據,測試查詢效率
1 CheckLoadTime(ref bTree, 100000); 2 CheckLoadTime(ref bTree, 1000000); 3 CheckLoadTime(ref bTree, 3000000); 4 CheckLoadTime(ref bTree, 5000000); 5 CheckLoadTime(ref bTree, 8000000); 6 CheckLoadTime(ref bTree, 10000000);
全是1毫秒內返回,數據檢索效率很是高,
實際上最初在學校潦草學了一遍【數據結構】以後,工做那麼多年都用不着這方面的知識點,早就忘得一乾二淨了。
從新引發我興趣的是2017年下半年,當時一個項目須要用到共享內存做爲快速讀寫數據的底層核心功能。在設計共享內存存儲關係時,
就遇到了索引的快速檢索要求,第一次是順序檢索,當數據量達到5萬以上時系統就崩了,檢索速度太慢;後來改成二分查找法,輕鬆達到20萬數據;
達到20萬後就差很少到了單機處理性能瓶頸了,由於CPU不夠用,除了檢索還須要作其餘的業務計算;
那時候就一直在搜索快速查找的各類算法,什麼快速排序算法、堆排序算法、歸併排序、二分查找算法、DFS(深度優先搜索)、BFS(廣度優先搜索),
基本上都瞭解了一遍,可是看得頭疼,沒去實踐。最後看到樹結構,引發我很大興趣,就是園友nullzx的這篇:B+樹在磁盤存儲中的應用,
這讓我瞭解到原來數據庫是這樣讀寫的,這頗有意思,得造個輪子本身試一次。
粗陋倉促寫成,恐怕有不少地方有漏洞,因此若是文中有錯誤的地方,歡迎留言討論,可是拒絕一波流的吐槽,我但是會刪低級評論的。