B樹概述與簡單應用示例(C#)

引言:

  天不生仲尼,萬古如長夜。在計算機科學中,也有一個劃時代的發明,B樹(多路平衡查找樹)及其變體(B樹,b*樹,b+樹);html

由德國科學家(魯道夫·拜爾 Rudolf Bayer),美國科學家(愛德華·M·麥克特 Edward Meyers McCreight)於1970年共同發明;node

B樹這種數據結構特別適合用於數據庫與文件系統設計中,是人類精神財富的精華部分,B樹不誕生,計算機在處理大數據量計算時會變得很是困難。mysql

 

用途:

  基本上都是軟件產品最底層的,最核心的功能。程序員

如:各類操做系統(windows,Linux,Mac)的文件系統索引,各類數據庫(sqlserver、oracle、mysql、MongoDB、等等),算法

基本上大部分與大數據量讀取有關的事務,多少都與B樹家族有關,由於B樹的優勢太明顯,特別是讀取磁盤數據效率很是的高效,sql

查找效率O(log n),甚至在B+樹中查詢速度恆定,不管多少存儲多少數據,查詢任何一個速度都同樣。簡直就是天才的發明。數據庫

 

誕生的緣由:

  在上世紀時期,計算機內存儲器都很是的小,以KB爲單位,比起如今動不動以G計算,簡直小的可憐。windows

計算機運算數據時,數據是在內存中進行操做的,好比一些加減乘除、正刪改查等。數組

舉個簡單的栗子:從一個數組 int a[1,2,3,4,5,6,7,8,9]中找出3,那很是簡單;大概步驟以下:數據結構

  一、在內存中初始化這個數組

  二、獲取數組指針遍歷這個數組,查到3就完成

  可是這個數組很大,好比包含1億個數字怎麼辦?若是數組容量大大超過內存大小,那這種比較就不現實了。如今的作法都是把文件

數據存放在外存儲器,好比磁盤,U盤,光盤;而後把文件分屢次的拷貝數據至內存進行操做。可是讀取外存儲器效率對比讀取內存,

差距是很是大的,通常是百萬級別的差距,差6個數量級,因此這個問題不解決一切都是空談。

  好在操做系統在設計之初,就對讀取外存儲器進行了必定的優化,引入了「邏輯塊」概念,當作操做文件的最小單元,而B樹合理地利用這個「邏輯塊」

功能開發的高效存儲數據結構;在介紹B樹特性以前,先來了解一下磁盤的基本工做原理。

 

磁盤簡單介紹:

1)磁盤結構介紹 

 

  網上引用的兩張圖,將就看看,基本結構是:磁盤 > 盤面 > 磁道 > 扇區

  左邊是物理圖,這個你們應該都是常常見到了,通常圓形的那部分有不少層,每一層叫盤片;右邊的是示意圖,表明左圖的一個盤面。

每一個盤面有跟多環形的磁道,每一個磁道有若干段扇區組成,扇區是磁盤的最小組成單元,若干段扇區組成簇(也叫磁盤塊、邏輯塊等)

先看看我電腦的磁盤簇與扇區大小

  能夠看到個人E盤每一個扇區512個字節,每一個簇4096字節,這個先記下來,後邊有用到

扇區是磁盤組成的最小單元,簇是虛擬出來的,主要是爲了操做系統方便讀寫磁盤;因爲扇區比較小,數量很是多,

在尋址比較麻煩,操做系統就將相鄰的幾個扇區組合在一塊兒,造成簇,再以簇爲每次做文件的最小單元。好比加載一個磁盤文件內容,

操做系統是分批次讀取,每次只拷貝一個簇的單位數據,個人電腦就是一次拷貝4096字節,知道文件所有拷貝完成。

2)讀寫速度

  磁盤讀取時間是毫秒級別的通常幾毫秒到十幾毫秒之間,這個跟磁盤轉速有點關係,還有就是數據所在磁道遠近有關係;

CPU處理時間是納秒級別,毫秒:納秒 = 1:1000000,因此在程序設計中,讀取文件是時間成本很是高的,應該儘可能合理設計;

 

B樹簡介(維基百科):

  B樹(英語:B-tree)是一種自平衡的樹,可以保持數據有序。這種數據結構可以讓查找數據、順序訪問、插入數據及刪除的動做,

都在對數時間內完成。B樹,歸納來講是一個通常化的二叉查找樹(binary search tree)一個節點能夠擁有最少2個子節點。

與自平衡二叉查找樹不一樣,B樹適用於讀寫相對大的數據塊的存儲系統,例如磁盤。B樹減小定位記錄時所經歷的中間過程,從而加快存取速度。

B樹這種數據結構能夠用來描述外部存儲。這種數據結構常被應用在數據庫和文件系統的實現上。

  一個 m 階的B樹是一個有如下特性:

  • 每個節點最多有 m 個子節點
  • 每個非葉子節點(除根節點)最少有 ⌈m/2⌉ 個子節點
  • 若是根節點不是葉子節點,那麼它至少有兩個子節點
  • 有 k 個子節點的非葉子節點擁有 k − 1 個鍵
  • 全部的葉子節點都在同一層

 

  好吧,上邊這一段看了等於沒看的定義能夠不看,這裏有個重要的B樹特性須要瞭解,就是B樹的階,對於階的定義國內外是有分歧的,有的定義爲度

階指的是節點的最大孩子數,度指的是節點的最小孩子數,我查閱了不少資料,基本上能夠理解爲: 

1度 = 2階,好比說3度B樹,能夠理解爲6階B樹。這點有些疑問,有更好的說法的能夠留言討論一下。

 

1)內部節點:

  內部節點是除葉子節點和根節點以外的全部節點。每一個內部節點擁有最多 U 個,最少 L 個子節點。元素的數量老是比子節點指針的數量少1。

U 必須等於 2L 或者 2L-1。這個L通常是度數。

2)根節點:根節點擁有的子節點數量的上限和內部節點相同,可是沒有下限。

3)葉子節點:葉子節點對元素的數量有相同的限制,可是沒有子節點,也沒有指向子節點的指針。

4)爲了分析方便舉例3階3層B樹

 

                        圖1 

從上圖中能夠得出如下幾個信息:

  • 紅色數字標示整個節點(即三、6在同一個節點內,圖中總共9個節點),黑色數字表示每一個節點內的鍵值。
  • 全部數據插入B樹後,都是從左到右順序排列,從根節點開始,節點左邊孩子鍵值都小於節點鍵值,右邊孩子鍵值都大於節點鍵值。
  • 樹的階數指的是每一個節點的最大孩子節點數,圖中最多孩子節點數爲3,即階數=3,鍵值數量最少爲:1,最大爲:階數 -1

 

數據檢索分析:

  依據上圖分析,由於整棵樹已經在內存中,至關於一個變量,數據檢索首先是從根節點開始;

1)若是要查詢9,首先從根節點比較,那比較一次就獲得結果,

2)若是要查詢第二層的三、4,首先判斷根節點鍵值,沒有匹配到,可是能夠判斷要檢索的鍵值比根節點小,

   因此接下來是從左孩子樹繼續檢索,十二、15也是相似,總共須要2次比較就獲得結果

3)若是查詢葉子節點鍵值,相似2),只須要3次比較就能獲得結果。

4)對比普通的數組遍歷查詢,B樹檢索的時間成本沒有隨數據量增長而線性增長,效率大大提升。

 

B樹的應用分析:

  前面已經提到,若是樹已經在內存中,那固然好辦,直接遍歷就行了。若是B樹僅僅如此,那也和數組差異不大,一樣受限於內存大小;

因此,在內存中建立整棵B樹是不現實的,這不是B樹的正確打開方式。

  前面也已經提到,操做系統加載磁盤文件的時候,若是文件超過大小(即4096個字節),那會分屢次的讀取磁盤,直到拷貝數據完成。

這裏看似一個加載動做,其實這個動做包含了N次磁盤尋址,而咱們已經知道,每次磁盤尋址直至拷貝數據開銷是很是大的;是CPU指令耗時百萬倍以上;

這種操做應該儘可能少地執行,而B樹這種數據結構就是爲了解決磁盤讀取瓶頸這個問題而產生的。

  實際應用中,B樹會持久化到磁盤,而後只在內存保留一個根節點的指針。已上圖1爲例:

  每一個節點大小恰好等於大小,這樣只需一次磁盤IO就能夠獲取到一整個節點的全部鍵值,及其全部子樹的指針。

好比,查詢鍵值8:

  1)第一步,讀取根節點獲得鍵值9,以及2個子樹指針,分別指向左右孩子節點,由於9 > 8,因此下一步加載左孩子節點

  2)第二部,加載節點2,獲得鍵值三、6,以及3個子樹指針,由於三、6 < 8,因此下一步要加載節點2的右孩子節點

  3)第三部,加載節點6,獲得鍵值七、8,由於是葉子節點因此沒有子樹指針,遍歷鍵值匹配到8,返回。

 

總結:

  在這個3階3層的B樹中,不管查找哪個鍵值,最多隻須要3次磁盤操做,就算平均每次耗時10毫秒,總共須要耗時30毫秒(CPU運算耗時能夠忽略);

以此類推,3階4層的B樹,須要讀取4次磁盤,耗時40毫秒,5層50毫秒,6層60毫秒,7層,8層,,,,

  這樣一看貌似也沒什麼,幾十毫秒已經不能說快了,可是別忘了咱們這顆樹只有3階,即一個節點保存2個鍵值。一個簇最多能有4096/4=1024個鍵值;

若是建立一個1024階的B樹,分別控制在三、四、5層的話,根據B樹高度公式:,H爲層數,T爲1024,n爲數據總數

耗時以下:

  3階3層:能容納2147483648(20億)個鍵值,檢索耗時也將30毫秒內

  3階4層:能容納2147483648(20億) ~ 2199023255552(2兆億)個鍵值,檢索耗時也將40毫秒內,固然這已經超出鍵值表達範圍了

  3階5層:難以想象。。。

 

  固然實際運用當中達不到1024階,由於樹持久化到磁盤時,索引結構體通常都是超過4個字節,好比12個字節,那一個簇最多能有4096/12=341個鍵值。

若是階數按341來算:

  3階3層:能容納79303642(7千萬)個鍵值,檢索耗時也將30毫秒內

  3階4層:能容納79303642(7千萬) ~ 27042541922(200億)個鍵值,檢索耗時也將40毫秒內

  也是很是多了。。

 

B樹簡單示例:

1)首先,咱們把B樹基本信息定義出來

1 public class Consts
2 {
3     public const int M = 3;                  // B樹的最小度數
4     public const int KeyMax = 2 * M - 1;     // 節點包含關鍵字的最大個數
5     public const int KeyMin = M - 1;         // 非根節點包含關鍵字的最小個數
6     public const int ChildMax = KeyMax + 1;  // 孩子節點的最大個數
7     public const int ChildMin = KeyMin + 1;  // 孩子節點的最小個數
8 }

先寫個簡單的demo,由於最小度數爲3,那就是6階。先實現幾個簡單的方法,新增,拆分,其他的合併,刪除比較複雜之後有機會再看看

2)定義BTreeNode,B樹節點

  1     public class BTreeNode
  2     {
  3         private bool leaf;
  4         public int[] keys;
  5         public int keyNumber;
  6         public BTreeNode[] children;
  7         public int blockIndex;
  8         public int dataIndex;
  9 
 10         public BTreeNode(bool leaf)
 11         {
 12             this.leaf = leaf;
 13             keys = new int[Consts.KeyMax];
 14             children = new BTreeNode[Consts.ChildMax];
 15         }
 16 
 17         /// <summary>在未滿的節點中插入鍵值</summary>
 18         /// <param name="key">鍵值</param>
 19         public void InsertNonFull(int key)
 20         {
 21             var index = keyNumber - 1;
 22 
 23             if (leaf == true)
 24             {
 25                 // 找到合適位置,而且移動節點鍵值騰出位置
 26                 while (index >= 0 && keys[index] > key)
 27                 {
 28                     keys[index + 1] = keys[index];
 29                     index--;
 30                 }
 31 
 32                 // 在index後邊新增鍵值
 33                 keys[index + 1] = key;
 34                 keyNumber = keyNumber + 1;
 35             }
 36             else
 37             {
 38                 // 找到合適的子孩子索引
 39                 while (index >= 0 && keys[index] > key) index--;
 40 
 41                 // 若是孩子節點已滿
 42                 if (children[index + 1].keyNumber == Consts.KeyMax)
 43                 {
 44                     // 分裂該孩子節點
 45                     SplitChild(index + 1, children[index + 1]);
 46 
 47                     // 分裂後中間節點上跳父節點
 48                     // 孩子節點已經分裂成2個節點,找到合適的一個
 49                     if (keys[index + 1] < key) index++;
 50                 }
 51 
 52                 // 插入鍵值
 53                 children[index + 1].InsertNonFull(key);
 54             }
 55         }
 56 
 57         /// <summary>分裂節點</summary>
 58         /// <param name="childIndex">孩子節點索引</param>
 59         /// <param name="waitSplitNode">待分裂節點</param>
 60         public void SplitChild(int childIndex, BTreeNode waitSplitNode)
 61         {
 62             var newNode = new BTreeNode(waitSplitNode.leaf);
 63             newNode.keyNumber = Consts.KeyMin;
 64 
 65             // 把待分裂的節點中的通常節點搬到新節點
 66             for (var j = 0; j < Consts.KeyMin; j++)
 67             {
 68                 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin];
 69 
 70                 // 清0
 71                 waitSplitNode.keys[j + Consts.ChildMin] = 0;
 72             }
 73 
 74             // 若是待分裂節點不是也只節點
 75             if (waitSplitNode.leaf == false)
 76             {
 77                 for (var j = 0; j < Consts.ChildMin; j++)
 78                 {
 79                     // 把孩子節點也搬過去
 80                     newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin];
 81 
 82                     // 清0
 83                     waitSplitNode.children[j + Consts.ChildMin] = null;
 84                 }
 85             }
 86 
 87             waitSplitNode.keyNumber = Consts.KeyMin;
 88 
 89             // 拷貝通常鍵值到新節點
 90             for (var j = keyNumber; j >= childIndex + 1; j--)
 91                 children[j + 1] = children[j];
 92 
 93             children[childIndex + 1] = newNode;
 94             for (var j = keyNumber - 1; j >= childIndex; j--)
 95                 keys[j + 1] = keys[j];
 96 
 97             // 把中間鍵值上跳至父節點
 98             keys[childIndex] = waitSplitNode.keys[Consts.KeyMin];
 99 
100             // 清0
101             waitSplitNode.keys[Consts.KeyMin] = 0;
102 
103             // 根節點鍵值數自加
104             keyNumber = keyNumber + 1;
105         }
106 
107         /// <summary>根據節點索引順序打印節點鍵值</summary>
108         public void PrintByIndex()
109         {
110             int index;
111             for (index = 0; index < keyNumber; index++)
112             {
113                 // 若是不是葉子節點, 先打印葉子子節點. 
114                 if (leaf == false) children[index].PrintByIndex();
115 
116                 Console.Write("{0} ", keys[index]);
117             }
118 
119             // 打印孩子節點
120             if (leaf == false) children[index].PrintByIndex();
121         }
122 
123         /// <summary>查找某鍵值是否已經存在樹中</summary>
124         /// <param name="key">鍵值</param>
125         /// <returns></returns>
126         public BTreeNode Find(int key)
127         {
128             int index = 0;
129             while (index < keyNumber && key > keys[index]) index++;
130 
131             // 該key已經存在, 返回該索引位置節點
132             if (keys[index] == key) return this;
133 
134             // key 不存在,而且節點是葉子節點
135             if (leaf == true) return null;
136 
137             // 遞歸在孩子節點中查找
138             return children[index].Find(key);
139         }
140     }
View Code

3)B樹模型

 1     public class BTree
 2     {
 3         public BTreeNode Root { get; private set; }
 4 
 5         public BTree() { }
 6 
 7         /// <summary>根據節點索引順序打印節點鍵值</summary>
 8         public void PrintByIndex()
 9         {
10             if (Root == null)
11             {
12                 Console.WriteLine("空樹");
13                 return;
14             }
15 
16             Root.PrintByIndex();
17         }
18 
19         /// <summary>查找某鍵值是否已經存在樹中</summary>
20         /// <param name="key">鍵值</param>
21         /// <returns></returns>
22         public BTreeNode Find(int key)
23         {
24             if (Root == null) return null;
25 
26             return Root.Find(key);
27         }
28 
29         /// <summary>新增B樹節點鍵值</summary>
30         /// <param name="key">鍵值</param>
31         public void Insert(int key)
32         {
33             if (Root == null)
34             {
35                 Root = new BTreeNode(true);
36                 Root.keys[0] = key; 
37                 Root.keyNumber = 1;  
38                 return;
39             }
40 
41             if (Root.keyNumber == Consts.KeyMax)
42             {
43                 var newNode = new BTreeNode(false);
44 
45                 newNode.children[0] = Root; 
46                 newNode.SplitChild(0, Root);
47 
48                 var index = 0;
49                 if (newNode.keys[0] < key) index++;
50 
51                 newNode.children[index].InsertNonFull(key);
52                 Root = newNode;
53             }
54             else
55             {
56                 Root.InsertNonFull(key);
57             }
58         }
59     }
View Code

4)新增20個無序鍵值,測試一下

 1             var bTree = new BTree();
 2 
 3             bTree.Insert(4);
 4             bTree.Insert(5);
 5             bTree.Insert(6);
 6             bTree.Insert(1);
 7             bTree.Insert(2);
 8             bTree.Insert(3);
 9             bTree.Insert(10);
10             bTree.Insert(11);
11             bTree.Insert(12);
12             bTree.Insert(7);
13             bTree.Insert(8);
14             bTree.Insert(9);
15             bTree.Insert(13);
16             bTree.Insert(14);
17             bTree.Insert(18);
18             bTree.Insert(19);
19             bTree.Insert(20);
20             bTree.Insert(15);
21             bTree.Insert(16);
22             bTree.Insert(17);
23 
24             Console.WriteLine("輸出排序後鍵值");
25             bTree.PrintByIndex();

5)運行

 

B樹持久化: 

  上文提到,B數不可能只存在內存而沒法落地,那樣沒有意義。因此就須要將整棵樹持久化到磁盤文件,而且還要支持快速地從磁盤文件中檢索到鍵值;

要持久化就要考慮不少問題,像上邊的簡單示例是沒有實際意義的,由於節點不可能只有鍵值與孩子樹,還得有數據指針,存儲位置等等,大概有如下一些問題:

  • 如何保存每一個節點佔有字節數恰好等於一個簇大小(4096字節),由於這樣就符合一次IO操做的數據交換上限?
  • 如何保存每一個節點的全部鍵值,以及這個節點下屬全部子樹關係?
  • 如何保存每一個鍵值對應的數據指針地址,以及指針與鍵值的對應關係如何維持?
  • 如何保證內存與磁盤的數據交換中可以正確地還原樹結構,即重建樹的某部分層級與鍵值和子樹的關係?
  • 等等。。

  問題比較多,很是麻煩。具體的過程就不列舉了,如下展現如下修改後的B樹模型。

 

一、先定義一個結構體

 1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)]
 2 public struct BlockItem
 3 {
 4     public int ChildBlockIndex;
 5     public int Key;
 6     public int DataIndex;
 7 
 8     public BlockItem(int key, int dataIndex)
 9     {
10         ChildBlockIndex = -1;
11         Key = key;
12         DataIndex = dataIndex;
13     }
14 }

 

  結構體總共12字節,爲了可以持久化整棵B樹到磁盤,加入了ChildBlockIndex子孩子節點塊索引,根據這個塊索引在下一次重建子孩子樹層級關係時就知道從

文件的那個位置開始讀取;Key鍵值,DataIndex數據索引,數據索引也是一個文件位置記錄,跟ChildBlockIndex差很少,這樣檢索到key後就知道從

文件哪一個位置獲取真正的數據。爲了更形象瞭解B樹應用,我畫了一個結構體的示意圖:

0、總共3個節點,每一個節點由N個結構體組成,最末尾只有孩子指針,沒有數據與鍵值

一、黃色爲子樹塊索引,即ChildBlockIndex,指向這個子孩子樹全部數據在文件中的位置

二、紅色爲鍵值,即Key,鍵值通常是惟一的,不容許重複

三、藍色爲數據塊索引,即DataIndex,指向鍵值對應的數據在文件中的什麼位置開始,而後讀取一個結構體的長度便可

四、底下綠色的一塊是數據指針指向的具體數據塊

二、數據結構體

 1 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi, Pack = 1)]
 2 public struct SDataTest
 3 {
 4     public int Idx;
 5     public int Age;
 6     public byte Sex;
 7 
 8     [MarshalAs(UnmanagedType.ByValArray, SizeConst = 20)]
 9     public byte[] Name;
10 
11     public byte Valid;
12 };

 

三、B樹節點類修改改一下,這個就不解釋了,複習一下程序員基本功,啃代碼。

  1     public class BTreeNode
  2     {
  3         private BTree tree;
  4         private bool leaf;
  5 
  6         public int keyNumber;
  7         public BlockItem[] keys;
  8         public BTreeNode[] children;
  9 
 10         public int blockIndex;
 11         public int findIndex;
 12 
 13         public BTreeNode(BTree tree, bool leaf)
 14         {
 15             this.tree = tree;
 16             this.leaf = leaf;
 17             keys = new BlockItem[Consts.KeyMax];
 18             children = new BTreeNode[Consts.ChildMax];
 19             blockIndex = Consts.BlockIndex++;
 20         }
 21 
 22         /// <summary>在未滿的節點中插入鍵值</summary>
 23         /// <param name="key">鍵值</param>
 24         public void InsertNonFull(BlockItem item)
 25         {
 26             var index = keyNumber - 1;
 27 
 28             if (leaf == true)
 29             {
 30                 // 找到合適位置,而且移動節點鍵值騰出位置
 31                 while (index >= 0 && keys[index].Key > item.Key)
 32                 {
 33                     keys[index + 1] = keys[index];
 34                     index--;
 35                 }
 36 
 37                 // 在index後邊新增鍵值
 38                 keys[index + 1] = item;
 39                 keyNumber = keyNumber + 1;
 40             }
 41             else
 42             {
 43                 // 找到合適的子孩子索引
 44                 while (index >= 0 && keys[index].Key > item.Key) index--;
 45 
 46                 // 若是孩子節點已滿
 47                 if (children[index + 1].keyNumber == Consts.KeyMax)
 48                 {
 49                     // 分裂該孩子節點
 50                     SplitChild(index + 1, children[index + 1]);
 51 
 52                     // 分裂後中間節點上跳父節點
 53                     // 孩子節點已經分裂成2個節點,找到合適的一個
 54                     if (keys[index + 1].Key < item.Key) index++;
 55                 }
 56 
 57                 // 插入鍵值
 58                 children[index + 1].InsertNonFull(item);
 59             }
 60         }
 61 
 62         /// <summary>分裂節點</summary>
 63         /// <param name="childIndex">孩子節點索引</param>
 64         /// <param name="waitSplitNode">待分裂節點</param>
 65         public void SplitChild(int childIndex, BTreeNode waitSplitNode)
 66         {
 67             var newNode = new BTreeNode(tree, waitSplitNode.leaf);
 68             newNode.keyNumber = Consts.KeyMin;
 69 
 70             // 把待分裂的節點中的通常節點搬到新節點
 71             for (var j = 0; j < Consts.KeyMin; j++)
 72             {
 73                 newNode.keys[j] = waitSplitNode.keys[j + Consts.ChildMin];
 74 
 75                 // 清0
 76                 waitSplitNode.keys[j + Consts.ChildMin] = default(BlockItem);
 77             }
 78 
 79             // 若是待分裂節點不是也只節點
 80             if (waitSplitNode.leaf == false)
 81             {
 82                 for (var j = 0; j < Consts.ChildMin; j++)
 83                 {
 84                     // 把孩子節點也搬過去
 85                     newNode.children[j] = waitSplitNode.children[j + Consts.ChildMin];
 86 
 87                     // 清0
 88                     waitSplitNode.children[j + Consts.ChildMin] = null;
 89                 }
 90             }
 91 
 92             waitSplitNode.keyNumber = Consts.KeyMin;
 93 
 94             for (var j = keyNumber; j >= childIndex + 1; j--)
 95                 children[j + 1] = children[j];
 96 
 97             children[childIndex + 1] = newNode;
 98 
 99             for (var j = keyNumber - 1; j >= childIndex; j--)
100                 keys[j + 1] = keys[j];
101 
102             // 把中間鍵值上跳至父節點
103             keys[childIndex] = waitSplitNode.keys[Consts.KeyMin];
104 
105             // 清0
106             waitSplitNode.keys[Consts.KeyMin] = default(BlockItem);
107 
108             // 根節點鍵值數自加
109             keyNumber = keyNumber + 1;
110         }
111 
112         /// <summary>根據節點索引順序打印節點鍵值</summary>
113         public void PrintByIndex()
114         {
115             int index;
116             for (index = 0; index < keyNumber; index++)
117             {
118                 // 若是不是葉子節點, 先打印葉子子節點. 
119                 if (leaf == false) children[index].PrintByIndex();
120 
121                 Console.Write("{0} ", keys[index].Key);
122             }
123 
124             // 打印孩子節點
125             if (leaf == false) children[index].PrintByIndex();
126         }
127 
128         /// <summary>查找某鍵值是否已經存在樹中</summary>
129         /// <param name="item">鍵值</param>
130         /// <returns></returns>
131         public BTreeNode Find(BlockItem item)
132         {
133             findIndex = 0;
134             int index = 0;
135             while (index < keyNumber && item.Key > keys[index].Key) index++;
136 
137             // 遍歷所有都未找到,索引計數減1
138             if (index > 0 && index == keyNumber) index--;
139 
140             // 該key已經存在, 返回該索引位置節點
141             if (keys[index].Key == item.Key)
142             {
143                 findIndex = index;
144                 return this;
145             }
146 
147             // key 不存在,而且節點是葉子節點
148             if (leaf == true) return null;
149 
150             // 重建children[index]數據結構
151             var childBlockIndex = keys[index].ChildBlockIndex;
152             tree.LoadNodeByBlock(ref children[index], childBlockIndex);
153 
154             // 遞歸在孩子節點中查找
155             if (children[index] == null) return null;
156             return children[index].Find(item);
157         }
158     }
View Code

 

四、B樹模型也要修改一下 ,不解釋

  1     public class BTree
  2     {
  3         private FileStream rwFS;
  4 
  5         public BTreeNode Root;
  6 
  7         public BTree(string fullName)
  8         {
  9             rwFS = new FileStream(fullName, FileMode.OpenOrCreate, FileAccess.ReadWrite);
 10 
 11             // 建立10M的空間,用作索引存儲
 12             if (rwFS.Length == 0)
 13             {
 14                 rwFS.SetLength(Consts.IndexTotalSize);
 15             }
 16 
 17             // 從數據文件重建根節點,內存只保存根節點
 18             LoadNodeByBlock(ref Root, 0);
 19         }
 20 
 21         public void LoadNodeByBlock(ref BTreeNode node, int blockIndex)
 22         {
 23             var items = Helper.Read(rwFS,blockIndex);
 24             if (items.Count > 0)
 25             {
 26                 var isLeaf = items[0].ChildBlockIndex == Consts.NoChild;
 27 
 28                 node = new BTreeNode(this, isLeaf);
 29                 node.blockIndex = blockIndex;
 30                 node.keys = items.ToArray();
 31                 node.keyNumber = items.Count;
 32             }
 33         }
 34 
 35         /// <summary>根據節點索引順序打印節點鍵值</summary>
 36         public void PrintByIndex()
 37         {
 38             if (Root == null)
 39             {
 40                 Console.WriteLine("空樹");
 41                 return;
 42             }
 43 
 44             Root.PrintByIndex();
 45         }
 46 
 47         /// <summary>查找某鍵值是否已經存在樹中</summary>
 48         /// <param name="item">鍵值</param>
 49         /// <returns></returns>
 50         public BTreeNode Find(BlockItem item)
 51         {
 52             if (Root == null) return null;
 53 
 54             return Root.Find(item);
 55         }
 56         public BTreeNode Find(int key)
 57         {
 58             return Find(new BlockItem() { Key = key });
 59         }
 60 
 61         /// <summary>新增B樹節點鍵值</summary>
 62         /// <param name="item">鍵值</param>
 63         private void Insert(BlockItem item)
 64         {
 65             if (Root == null)
 66             {
 67                 Root = new BTreeNode(this, true);
 68                 Root.keys[0] = item;  
 69                 Root.keyNumber = 1;  
 70             }
 71             else
 72             {
 73                 if (Root.keyNumber == Consts.KeyMax)
 74                 {
 75                     var newNode = new BTreeNode(this, false);
 76 
 77                     newNode.children[0] = Root;
 78                     newNode.SplitChild(0, Root);
 79 
 80                     var index = 0;
 81                     if (newNode.keys[0].Key < item.Key) index++;
 82 
 83                     newNode.children[index].InsertNonFull(item);
 84                     Root = newNode;
 85                 }
 86                 else
 87                 {
 88                     Root.InsertNonFull(item);
 89                 }
 90             }
 91         }
 92 
 93         public void Insert(SDataTest data)
 94         {
 95             var item = new BlockItem()
 96             {
 97                 Key = data.Idx
 98             };
 99 
100             var node = Find(item);
101             if (node != null)
102             {
103                 Console.WriteLine("鍵值已經存在,info:{0}", item.Key);
104                 return;
105             }
106 
107             // 保存數據
108             item.DataIndex = Helper.InsertData(rwFS, data);
109 
110             // 保存索引
111             if (item.DataIndex >= 0)
112                 Insert(item);
113         }
114 
115         /// <summary>持久化整棵樹</summary>
116         public void SaveIndexAll()
117         {
118             SaveIndex(Root);
119         }
120 
121         /// <summary>持久化某節點如下的樹枝</summary>
122         /// <param name="node">某節點</param>
123         public void SaveIndex(BTreeNode node)
124         {
125             var bw = new BinaryWriter(rwFS);
126             var keyItem = default(BlockItem);
127 
128             // 第一層
129             var nodeL1 = node;
130             if (nodeL1 == null) return;
131 
132             for (var i = 0; i <= nodeL1.keyNumber; i++)
133             {
134                 keyItem = default(BlockItem);
135                 if (i < nodeL1.keyNumber) keyItem = nodeL1.keys[i];
136 
137                 SaveIndex(bw, 0, i, nodeL1.children[i], keyItem);
138 
139                 // 第二層
140                 var nodeL2 = nodeL1.children[i];
141                 if (nodeL2 == null) continue;
142 
143                 for (var j = 0; j <= nodeL2.keyNumber; j++)
144                 {
145                     keyItem = default(BlockItem);
146                     if (j < nodeL2.keyNumber) keyItem = nodeL2.keys[j];
147 
148                     SaveIndex(bw, nodeL2.blockIndex, j, nodeL2.children[j], keyItem);
149 
150                     // 第三層
151                     var nodeL3 = nodeL2.children[j];
152                     if (nodeL3 == null) continue;
153 
154                     for (var k = 0; k <= nodeL3.keyNumber; k++)
155                     {
156                         keyItem = default(BlockItem);
157                         if (k < nodeL3.keyNumber) keyItem = nodeL3.keys[k];
158 
159                         SaveIndex(bw, nodeL3.blockIndex, k, nodeL3.children[k], keyItem);
160 
161                         // 第四層
162                         var nodeL4 = nodeL3.children[k];
163                         if (nodeL4 == null) continue;
164 
165                         for (var l = 0; l <= nodeL4.keyNumber; l++)
166                         {
167                             keyItem = default(BlockItem);
168                             if (l < nodeL4.keyNumber) keyItem = nodeL4.keys[l];
169 
170                             SaveIndex(bw, nodeL4.blockIndex, l, nodeL4.children[l], keyItem);
171 
172                             // 第五層
173                             var nodeL5 = nodeL4.children[l];
174                             if (nodeL5 == null) continue;
175 
176                             for (var z = 0; z <= nodeL5.keyNumber; z++)
177                             {
178                                 keyItem = default(BlockItem);
179                                 if (z < nodeL5.keyNumber) keyItem = nodeL5.keys[z];
180 
181                                 SaveIndex(bw, nodeL5.blockIndex, z, nodeL5.children[z], keyItem);
182                             }
183                         }
184                     }
185                 }
186             }
187         }
188         private void SaveIndex(BinaryWriter bw, int blockIndex, int num, BTreeNode node, BlockItem item)
189         {
190             bw.Seek((blockIndex * Consts.BlockSize) + (num * Consts.IndexSize), SeekOrigin.Begin);
191             bw.Write(node == null ? Consts.NoChild : node.blockIndex);
192             bw.Write(item.Key);
193             bw.Write(item.DataIndex);
194             bw.Flush();
195         }
196 
197         public SDataTest LoadData(int dataIndex)
198         {
199             return Helper.Load(rwFS, dataIndex);
200         }
201     }
View Code

 

五、寫測試

 1 private static void InsertTest(ref BTree bTree)
 2 {
 3     // 新增測試數據
 4     for (int i = 1; i <= Consts.TotalKeyNumber; i++)
 5     {
 6         bTree.Insert(new SDataTest()
 7         {
 8             Idx = i,
 9             Age = i,
10             Sex = 1,
11             Name = Helper.Copy("Name(" + i.ToString() + ")", 20),
12             Valid = 1
13         });
14     }
15 
16     Console.WriteLine("測試數據添加完畢,共新增{0}條數據", Consts.TotalKeyNumber);
17 }

 

六、讀測試

 1 private static void FindTest(ref BTree bTree)
 2 {
 3     var count = 0;
 4 
 5     // 校驗數據查找
 6     for (int i = 1; i <= Consts.TotalKeyNumber; i++)
 7     {
 8         var node = bTree.Find(i);
 9         if (node == null)
10         {
11             //Console.WriteLine("未找到{0}", i);
12             continue;
13         }
14 
15         //Console.WriteLine("findIndex:{0},key:{1},dataIndex:{2}", node.findIndex, node.keys[node.findIndex].Key, node.keys[node.findIndex].DataIndex);
16 
17         count++;
18         if (count % 10000 == 0)
19         {
20             var data = bTree.LoadData(node.keys[node.findIndex].DataIndex);
21             var name = Encoding.Default.GetString(data.Name).TrimEnd('\0');
22             Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid);
23         }
24     }
25 
26     Console.WriteLine("有效數據個數:{0}", count);
27 }

 

七、最後測試一下

 

 八、測試查詢時間

 1 private static void CheckLoadTime(ref BTree bTree, int key)
 2 {
 3     var start = DateTime.Now;
 4     var node = bTree.Find(key);
 5     if (node == null) return;
 6 
 7     Console.WriteLine("查找{0},耗時:{1}", key.ToString(), (DateTime.Now - start).TotalMilliseconds.ToString());
 8 
 9     var data = bTree.LoadData(node.keys[node.findIndex].DataIndex);
10     var name = Encoding.Default.GetString(data.Name).TrimEnd('\0');
11     Console.WriteLine("Idx:{0},Age:{1},Sex:{2},Name:{3},Valid:{4}", data.Idx, data.Age, data.Sex, name, data.Valid);
12     Console.WriteLine();
13 }
1      CheckLoadTime(ref bTree, 1000);
2      CheckLoadTime(ref bTree, 10000);
3      CheckLoadTime(ref bTree, 50000);
4      CheckLoadTime(ref bTree, 100000);

 

 九、從新生成10000000條數據,測試查詢效率

1      CheckLoadTime(ref bTree, 100000);
2      CheckLoadTime(ref bTree, 1000000);
3      CheckLoadTime(ref bTree, 3000000);
4      CheckLoadTime(ref bTree, 5000000);
5      CheckLoadTime(ref bTree, 8000000);
6      CheckLoadTime(ref bTree, 10000000);

 

全是1毫秒內返回,數據檢索效率很是高,

 

學習歷程:

  實際上最初在學校潦草學了一遍【數據結構】以後,工做那麼多年都用不着這方面的知識點,早就忘得一乾二淨了。

從新引發我興趣的是2017年下半年,當時一個項目須要用到共享內存做爲快速讀寫數據的底層核心功能。在設計共享內存存儲關係時,

就遇到了索引的快速檢索要求,第一次是順序檢索,當數據量達到5萬以上時系統就崩了,檢索速度太慢;後來改成二分查找法,輕鬆達到20萬數據;

達到20萬後就差很少到了單機處理性能瓶頸了,由於CPU不夠用,除了檢索還須要作其餘的業務計算;

  那時候就一直在搜索快速查找的各類算法,什麼快速排序算法、堆排序算法、歸併排序、二分查找算法、DFS(深度優先搜索)、BFS(廣度優先搜索),

基本上都瞭解了一遍,可是看得頭疼,沒去實踐。最後看到樹結構,引發我很大興趣,就是園友nullzx的這篇:B+樹在磁盤存儲中的應用

這讓我瞭解到原來數據庫是這樣讀寫的,這頗有意思,得造個輪子本身試一次

  粗陋倉促寫成,恐怕有不少地方有漏洞,因此若是文中有錯誤的地方,歡迎留言討論,可是拒絕一波流的吐槽,我但是會刪低級評論的。

相關文章
相關標籤/搜索