問題:一個很大的數據文件,單節點可用內存有限,使用多節點實現文件數據排序。node
思路:算法
1.一個主節點負責將文件分塊,若是分3塊,則開3個goroutine分別讀取文件的一部分,交給一個外部節點獨立排序;api
2.每一個外部節點接收數據並排序,將排序結果傳回主節點;網絡
3.主節點從每一個外部節點接收一個數據構建最小堆,最小堆的元素除記錄數據外還要記錄數據來自於哪一個節點,這樣將最小數據寫入文件寫緩衝區後,須要從相應的節點再拿一個數據過來從新構造最小堆,直至全部數據均寫入文件,排序完成。數據結構
4.考慮到數據可能存在相等的狀況,構建的最小堆須要支持記錄數值,以及全部數值的來源,這樣從最小堆取出一個數據,能夠寫入幾個相等的數據到文件緩衝區,同時從每一個相等數據的來源分別讀一個數據從新構造最小堆:app
type item struct { data interface{} source []int // 標記data來源,例如[]chan的id }
圖解:dom
代碼結構:tcp
GOPATH/pipeline/heapsort
--heap
----heap.go //最小堆數據結構
--mergesort
----mergesort.go //多路歸併排序
----source.go //數據源(隨機數據源,文件數據源,內存排序數據源,外部排序數據源)
--netnode
----server.go //外部排序節點
--main.go
代碼實現:ide
最小堆數據結構:函數
package heap import ( "fmt" ) // 最小堆數據結構 // data 存儲堆元素 // Cmp 元素比較函數 type Heap struct { data []item Cmp HeapCmpFunc // 比較函數,0相等,<0小於,>0大於 } type HeapCmpFunc func(interface{}, interface{}) int type item struct { data interface{} source []int // 標記data來源,例如[]chan的id } func NewHeap(cap int, cmp HeapCmpFunc) Heap { return Heap{ data: make([]item, 0, cap), Cmp: cmp, } } func (heap *Heap) Len() int { return len(heap.data) } func (heap *Heap) Print() { for i, v := range heap.data { fmt.Println(i, v.data) } } // 向堆中添加新元素 func (heap *Heap) Add(data interface{}, source int) bool { for _, v := range heap.data { if v.data == nil { break } if heap.Cmp(data, v.data) == 0 { // 堆中有相等數據,只添加數據來源 v.source = append(v.source, source) return true } } idx := heap.Len() if idx >= cap(heap.data) { heap.scale() } heap.data = append(heap.data, item{data, []int{source}}) heap.shiftUp(idx) return true } // 獲取堆頂元素值與來源標記 func (heap *Heap) Get() (data interface{}, source []int) { if heap.Len() < 1 { return nil, nil } data = heap.data[0].data source = heap.data[0].source heap.data = heap.data[1:] heap.heapify() return } func (heap *Heap) heapify() { firstParent := (heap.Len() - 1) / 2 for i := firstParent; i >= 0; i-- { heap.shiftDown(i) } } func (heap *Heap) shiftUp(idx int) { for idx > 0 { if heap.Cmp(heap.data[idx].data, heap.data[parent(idx)].data) < 0 { heap.swap(idx, parent(idx)) idx = parent(idx) } else { break } } } func (heap *Heap) shiftDown(idx int) { l, r := left(idx), right(idx) if r < heap.Len() && heap.Cmp(heap.data[idx].data, heap.data[r].data) > 0 { heap.swap(idx, r) } if l < heap.Len() && heap.Cmp(heap.data[idx].data, heap.data[l].data) > 0 { heap.swap(idx, l) } } func (heap *Heap) swap(i, j int) { heap.data[i], heap.data[j] = heap.data[j], heap.data[i] } func (heap *Heap) scale() { cap := len(heap.data) * 2 if cap == 0 { cap = 8 } data := make([]item, len(heap.data), cap) copy(data, heap.data) heap.data = data } func parent(idx int) int { return (idx - 1) / 2 } func left(idx int) int { return 2*idx + 1 } func right(idx int) int { return 2*idx + 2 }
多路歸併排序算法:
package mergesort // 基於最小堆的多路歸併排序 import ( "pipeline/heapsort/heap" "strings" ) func cmpInt(a, b interface{}) int { return a.(int) - b.(int) } func cmpStr(a, b interface{}) int { return strings.Compare(a.(string), b.(string)) } func MergeSortInt(out chan int, ins ...chan int) { MergeSort(cmpInt, out, ins...) } // 多路歸併排序 func MergeSort(cmp heap.HeapCmpFunc, out chan int, ins ...chan int) { hp := heap.NewHeap(len(ins), cmp) // 構造堆數據 for idx, in := range ins { v, ok := <-in if ok { hp.Add(v, idx) } } for hp.Len() > 0 { // 從堆中讀取最小值 min, sources := hp.Get() if min != nil { // 填充堆數據 for _, idx := range sources { out <- min.(int) v, ok := <-ins[idx] if ok { hp.Add(v, idx) } } } } close(out) }
數據源:
package mergesort import ( "bufio" "encoding/binary" "io" "log" "math/rand" "net" "os" "sort" ) // 生成指定數目個隨機數據寫入out通道 func RandomSource(n int) chan int { out := make(chan int) go func() { for n > 0 { out <- rand.Int() n-- } close(out) }() return out } // 讀取指定文件指定偏移的一塊數據放入out通道 func ReaderSource(fileName string, offset int64, chunkSize int64) (chan int, error) { f, err := os.Open(fileName) if err != nil { return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { return nil, err } } out := make(chan int) go func() { defer f.Close() var num int64 var count int64 r := bufio.NewReader(f) for { if chunkSize != -1 && count >= chunkSize { break } err = binary.Read(r, binary.LittleEndian, &num) if err != nil { if err != io.EOF { log.Println(err) } break } out <- int(num) count += int64(binary.Size(num)) } close(out) }() return out, nil } // 內存排序 // 接收in通道的數據放入slice中,將排序的數據寫入out通道 func InMemSort(in chan int) chan int { out := make(chan int) go func() { var data []int for v := range in { data = append(data, v) } sort.Ints(data) for _, v := range data { out <- v } close(out) }() return out } // 網絡排序 // 將in通道中的數據傳輸到addr指定的網絡節點,由該節點進行排序而後接收排序的數據傳入out通道 func NetworkSort(in chan int, addr string) (out chan int, err error) { con, err := net.Dial("tcp", addr) if err != nil { return nil, err } out = make(chan int) go func() { defer con.Close() bufw := bufio.NewWriter(con) // 將待排序數字發送給遠端服務 for v := range in { err = binary.Write(bufw, binary.LittleEndian, int64(v)) if err != nil && err != io.EOF { log.Println(err) break } } bufw.Flush() // 關閉鏈接的寫半邊 tcpCon := con.(*net.TCPConn) err = tcpCon.CloseWrite() if err != nil { log.Println(err) return } // 接收排序後的數據 bufr := bufio.NewReader(con) for { var num int64 err = binary.Read(bufr, binary.LittleEndian, &num) if err != nil { if err != io.EOF { log.Println(err) } break } out <- int(num) } close(out) }() return out, nil }
外部排序節點:
package main import ( "bufio" "encoding/binary" "flag" "fmt" "io" "log" "net" "sort" "time" ) var port = flag.Int("p", 8888, "port to listen on this server,default 8888.") func main() { flag.Parse() listener, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatal(err) } for { con, err := listener.Accept() if err != nil { log.Println(err) continue } log.Println(con.RemoteAddr()) go netSort(con) } } func netSort(con net.Conn) { start := time.Now() defer con.Close() bufr := bufio.NewReader(con) // 讀取客戶端發送的數據 var in []int var num int64 for { err := binary.Read(bufr, binary.LittleEndian, &num) if err != nil { if err != io.EOF { log.Println(err) } break } in = append(in, int(num)) } log.Println("read done.", time.Since(start)) // 排序 sort.Ints(in) log.Println("sort done.", time.Since(start)) // 將排好序的數據寫回客戶端 bufw := bufio.NewWriter(con) for _, v := range in { err := binary.Write(bufw, binary.LittleEndian, int64(v)) if err != nil { log.Println(err) break } } bufw.Flush() tcpCon := con.(*net.TCPConn) tcpCon.CloseWrite() log.Println("send done.", time.Since(start)) }
主節點:
package main import ( "bufio" "encoding/binary" "fmt" "math/rand" "os" "pipeline/heapsort/mergesort" ) func main() { //createTestFile("small.in", 10000) //createTestFile("large.in", 1000000) distributeFile("small.in", 3) } // 文件數據源外部排序測試 func distributeFile(filename string, chunkNum int) { f, err := os.Open(filename) if err != nil { fmt.Println(err) return } defer f.Close() finfo, err := f.Stat() filesize := finfo.Size() ins := make([]chan int, chunkNum) var offset int64 chunkSize := int64(filesize/int64(binary.Size(offset))/int64(chunkNum)) * int64(binary.Size(offset)) for i, _ := range ins { if i == chunkNum-1 { chunkSize = -1 } fin, err := mergesort.ReaderSource(filename, offset, chunkSize) offset += chunkSize if err != nil { fmt.Println(err) return } in, err := mergesort.NetworkSort(fin, fmt.Sprintf(":808%d", i)) if err != nil { fmt.Println(err) return } ins[i] = in } out := make(chan int) go mergesort.MergeSortInt(out, ins...) if testResult(out) { fmt.Println("PASS") } else { fmt.Println("FAIL") } } // 隨機數據源外部排序測試 func distributeRandom() { inCount := 3 numCount := 100 ins := make([]chan int, inCount) for i, _ := range ins { in, err := mergesort.NetworkSort(mergesort.RandomSource(numCount), fmt.Sprintf(":808%d", i)) if err != nil { fmt.Println(err) return } ins[i] = in } out := make(chan int) go mergesort.MergeSortInt(out, ins...) if testResult(out) { fmt.Println("PASS") } else { fmt.Println("FAIL") } } // 隨機數據源內存排序測試 func small() { inCount := 4 numCount := 100 ins := make([]chan int, inCount) for i, _ := range ins { ins[i] = mergesort.InMemSort(mergesort.RandomSource(numCount)) } out := make(chan int) go mergesort.MergeSortInt(out, ins...) if testResult(out) { fmt.Println("PASS") } else { fmt.Println("FAIL") } } //新建測試數據源 func createTestFile(filename string, count int) { f, err := os.Create(filename) if err != nil { fmt.Println(err) return } defer f.Close() bufw := bufio.NewWriter(f) defer bufw.Flush() for i := 0; i < count; i++ { num := rand.Int() err = binary.Write(bufw, binary.LittleEndian, int64(num)) if err != nil { fmt.Println(err) return } } } //檢查排序結果 func testResult(out chan int) bool { var pre int var start bool var printCOunt int for v := range out { if printCOunt < 100 { fmt.Println(v) printCOunt++ } if !start { pre = v start = true continue } if v < pre { for _ = range out { } return false } } return true }
執行效果:
1.準備測試數據文件:main.createTestFile("small.in",10000)
2.開啓三個外部排序節點:
netnode -p 8080
netnode -p 8081
netnode -p 8082
3.運行主節點heapsort
外部排序節點:
問題:
設計堆數據結構時考慮到支持不一樣數據類型,並能夠經過指定比較函數實現最大最小堆的轉換,可是沒有很好的實現方便的數據類型切換,interface{}能夠接受全部類型,但chan interface{}卻不能接受chan int或chan string。