Go語言練習:基於最小堆的外部排序

問題:一個很大的數據文件,單節點可用內存有限,使用多節點實現文件數據排序。node

思路:算法

1.一個主節點負責將文件分塊,若是分3塊,則開3個goroutine分別讀取文件的一部分,交給一個外部節點獨立排序;api

2.每一個外部節點接收數據並排序,將排序結果傳回主節點;網絡

3.主節點從每一個外部節點接收一個數據構建最小堆,最小堆的元素除記錄數據外還要記錄數據來自於哪一個節點,這樣將最小數據寫入文件寫緩衝區後,須要從相應的節點再拿一個數據過來從新構造最小堆,直至全部數據均寫入文件,排序完成。數據結構

4.考慮到數據可能存在相等的狀況,構建的最小堆須要支持記錄數值,以及全部數值的來源,這樣從最小堆取出一個數據,能夠寫入幾個相等的數據到文件緩衝區,同時從每一個相等數據的來源分別讀一個數據從新構造最小堆:app

type item struct {
    data   interface{}
    source []int // 標記data來源,例如[]chan的id
}

圖解:dom

代碼結構:tcp

GOPATH/pipeline/heapsort
--heap
----heap.go //最小堆數據結構
--mergesort
----mergesort.go //多路歸併排序
----source.go      //數據源(隨機數據源,文件數據源,內存排序數據源,外部排序數據源)
--netnode
----server.go      //外部排序節點
--main.go

代碼實現:ide

最小堆數據結構:函數

package heap

import (
    "fmt"
)

// 最小堆數據結構
// data 存儲堆元素
// Cmp 元素比較函數
type Heap struct {
    data []item
    Cmp  HeapCmpFunc // 比較函數,0相等,<0小於,>0大於
}

type HeapCmpFunc func(interface{}, interface{}) int

type item struct {
    data   interface{}
    source []int // 標記data來源,例如[]chan的id
}

func NewHeap(cap int, cmp HeapCmpFunc) Heap {
    return Heap{
        data: make([]item, 0, cap),
        Cmp:  cmp,
    }
}

func (heap *Heap) Len() int {
    return len(heap.data)
}

func (heap *Heap) Print() {
    for i, v := range heap.data {
        fmt.Println(i, v.data)
    }
}

// 向堆中添加新元素
func (heap *Heap) Add(data interface{}, source int) bool {
    for _, v := range heap.data {
        if v.data == nil {
            break
        }
        if heap.Cmp(data, v.data) == 0 { // 堆中有相等數據,只添加數據來源
            v.source = append(v.source, source)
            return true
        }
    }

    idx := heap.Len()
    if idx >= cap(heap.data) {
        heap.scale()
    }
    heap.data = append(heap.data, item{data, []int{source}})
    heap.shiftUp(idx)
    return true
}

// 獲取堆頂元素值與來源標記
func (heap *Heap) Get() (data interface{}, source []int) {
    if heap.Len() < 1 {
        return nil, nil
    }

    data = heap.data[0].data
    source = heap.data[0].source
    heap.data = heap.data[1:]
    heap.heapify()
    return
}

func (heap *Heap) heapify() {
    firstParent := (heap.Len() - 1) / 2
    for i := firstParent; i >= 0; i-- {
        heap.shiftDown(i)
    }
}

func (heap *Heap) shiftUp(idx int) {
    for idx > 0 {
        if heap.Cmp(heap.data[idx].data, heap.data[parent(idx)].data) < 0 {
            heap.swap(idx, parent(idx))
            idx = parent(idx)
        } else {
            break
        }
    }
}

func (heap *Heap) shiftDown(idx int) {
    l, r := left(idx), right(idx)
    if r < heap.Len() && heap.Cmp(heap.data[idx].data, heap.data[r].data) > 0 {
        heap.swap(idx, r)
    }
    if l < heap.Len() && heap.Cmp(heap.data[idx].data, heap.data[l].data) > 0 {
        heap.swap(idx, l)
    }
}
func (heap *Heap) swap(i, j int) {
    heap.data[i], heap.data[j] = heap.data[j], heap.data[i]
}

func (heap *Heap) scale() {
    cap := len(heap.data) * 2
    if cap == 0 {
        cap = 8
    }
    data := make([]item, len(heap.data), cap)
    copy(data, heap.data)
    heap.data = data
}

func parent(idx int) int {
    return (idx - 1) / 2
}
func left(idx int) int {
    return 2*idx + 1
}
func right(idx int) int {
    return 2*idx + 2
}
heap.go

多路歸併排序算法:

package mergesort

// 基於最小堆的多路歸併排序

import (
    "pipeline/heapsort/heap"
    "strings"
)

func cmpInt(a, b interface{}) int {
    return a.(int) - b.(int)
}

func cmpStr(a, b interface{}) int {
    return strings.Compare(a.(string), b.(string))
}

func MergeSortInt(out chan int, ins ...chan int) {
    MergeSort(cmpInt, out, ins...)
}

// 多路歸併排序
func MergeSort(cmp heap.HeapCmpFunc, out chan int, ins ...chan int) {
    hp := heap.NewHeap(len(ins), cmp)
    // 構造堆數據
    for idx, in := range ins {
        v, ok := <-in
        if ok {
            hp.Add(v, idx)
        }
    }

    for hp.Len() > 0 {
        // 從堆中讀取最小值
        min, sources := hp.Get()
        if min != nil {
            // 填充堆數據
            for _, idx := range sources {
                out <- min.(int)
                v, ok := <-ins[idx]
                if ok {
                    hp.Add(v, idx)
                }
            }
        }
    }
    close(out)
}
mergesort.go

數據源:

package mergesort

import (
    "bufio"
    "encoding/binary"
    "io"
    "log"
    "math/rand"
    "net"
    "os"
    "sort"
)

// 生成指定數目個隨機數據寫入out通道
func RandomSource(n int) chan int {
    out := make(chan int)
    go func() {
        for n > 0 {
            out <- rand.Int()
            n--
        }
        close(out)
    }()
    return out
}

// 讀取指定文件指定偏移的一塊數據放入out通道
func ReaderSource(fileName string, offset int64, chunkSize int64) (chan int, error) {
    f, err := os.Open(fileName)
    if err != nil {
        return nil, err
    }

    if offset > 0 {
        _, err = f.Seek(offset, 0)
        if err != nil {
            return nil, err
        }
    }

    out := make(chan int)
    go func() {
        defer f.Close()

        var num int64
        var count int64
        r := bufio.NewReader(f)
        for {
            if chunkSize != -1 && count >= chunkSize {
                break
            }
            err = binary.Read(r, binary.LittleEndian, &num)
            if err != nil {
                if err != io.EOF {
                    log.Println(err)
                }
                break
            }
            out <- int(num)
            count += int64(binary.Size(num))
        }
        close(out)
    }()
    return out, nil
}

// 內存排序
// 接收in通道的數據放入slice中,將排序的數據寫入out通道
func InMemSort(in chan int) chan int {
    out := make(chan int)
    go func() {
        var data []int
        for v := range in {
            data = append(data, v)
        }
        sort.Ints(data)
        for _, v := range data {
            out <- v
        }
        close(out)
    }()
    return out
}

// 網絡排序
// 將in通道中的數據傳輸到addr指定的網絡節點,由該節點進行排序而後接收排序的數據傳入out通道
func NetworkSort(in chan int, addr string) (out chan int, err error) {
    con, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    out = make(chan int)
    go func() {
        defer con.Close()
        bufw := bufio.NewWriter(con)
        // 將待排序數字發送給遠端服務
        for v := range in {
            err = binary.Write(bufw, binary.LittleEndian, int64(v))
            if err != nil && err != io.EOF {
                log.Println(err)
                break
            }
        }
        bufw.Flush()
        // 關閉鏈接的寫半邊
        tcpCon := con.(*net.TCPConn)
        err = tcpCon.CloseWrite()
        if err != nil {
            log.Println(err)
            return
        }

        // 接收排序後的數據
        bufr := bufio.NewReader(con)
        for {
            var num int64
            err = binary.Read(bufr, binary.LittleEndian, &num)
            if err != nil {
                if err != io.EOF {
                    log.Println(err)
                }
                break
            }
            out <- int(num)
        }
        close(out)
    }()
    return out, nil
}
source.go

外部排序節點:

package main

import (
    "bufio"
    "encoding/binary"
    "flag"
    "fmt"
    "io"
    "log"
    "net"
    "sort"
    "time"
)

var port = flag.Int("p", 8888, "port to listen on this server,default 8888.")

func main() {
    flag.Parse()
    listener, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    if err != nil {
        log.Fatal(err)
    }
    for {
        con, err := listener.Accept()
        if err != nil {
            log.Println(err)
            continue
        }
        log.Println(con.RemoteAddr())
        go netSort(con)
    }
}

func netSort(con net.Conn) {
    start := time.Now()
    defer con.Close()
    bufr := bufio.NewReader(con)
    // 讀取客戶端發送的數據
    var in []int
    var num int64
    for {
        err := binary.Read(bufr, binary.LittleEndian, &num)
        if err != nil {
            if err != io.EOF {
                log.Println(err)
            }
            break
        }
        in = append(in, int(num))
    }
    log.Println("read done.", time.Since(start))

    // 排序
    sort.Ints(in)
    log.Println("sort done.", time.Since(start))

    // 將排好序的數據寫回客戶端
    bufw := bufio.NewWriter(con)
    for _, v := range in {
        err := binary.Write(bufw, binary.LittleEndian, int64(v))
        if err != nil {
            log.Println(err)
            break
        }
    }
    bufw.Flush()
    tcpCon := con.(*net.TCPConn)
    tcpCon.CloseWrite()
    log.Println("send done.", time.Since(start))
}
server.go

主節點:

package main

import (
    "bufio"
    "encoding/binary"
    "fmt"
    "math/rand"
    "os"
    "pipeline/heapsort/mergesort"
)

func main() {
    //createTestFile("small.in", 10000)
    //createTestFile("large.in", 1000000)
    distributeFile("small.in", 3)
}

// 文件數據源外部排序測試
func distributeFile(filename string, chunkNum int) {
    f, err := os.Open(filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer f.Close()
    finfo, err := f.Stat()
    filesize := finfo.Size()
    ins := make([]chan int, chunkNum)
    var offset int64
    chunkSize := int64(filesize/int64(binary.Size(offset))/int64(chunkNum)) * int64(binary.Size(offset))
    for i, _ := range ins {
        if i == chunkNum-1 {
            chunkSize = -1
        }
        fin, err := mergesort.ReaderSource(filename, offset, chunkSize)
        offset += chunkSize
        if err != nil {
            fmt.Println(err)
            return
        }
        in, err := mergesort.NetworkSort(fin, fmt.Sprintf(":808%d", i))
        if err != nil {
            fmt.Println(err)
            return
        }
        ins[i] = in
    }

    out := make(chan int)
    go mergesort.MergeSortInt(out, ins...)

    if testResult(out) {
        fmt.Println("PASS")
    } else {
        fmt.Println("FAIL")
    }
}

// 隨機數據源外部排序測試
func distributeRandom() {
    inCount := 3
    numCount := 100
    ins := make([]chan int, inCount)
    for i, _ := range ins {
        in, err := mergesort.NetworkSort(mergesort.RandomSource(numCount), fmt.Sprintf(":808%d", i))
        if err != nil {
            fmt.Println(err)
            return
        }
        ins[i] = in
    }
    out := make(chan int)
    go mergesort.MergeSortInt(out, ins...)

    if testResult(out) {
        fmt.Println("PASS")
    } else {
        fmt.Println("FAIL")
    }
}

// 隨機數據源內存排序測試
func small() {
    inCount := 4
    numCount := 100
    ins := make([]chan int, inCount)
    for i, _ := range ins {
        ins[i] = mergesort.InMemSort(mergesort.RandomSource(numCount))
    }
    out := make(chan int)
    go mergesort.MergeSortInt(out, ins...)

    if testResult(out) {
        fmt.Println("PASS")
    } else {
        fmt.Println("FAIL")
    }
}

//新建測試數據源
func createTestFile(filename string, count int) {
    f, err := os.Create(filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    defer f.Close()

    bufw := bufio.NewWriter(f)
    defer bufw.Flush()
    for i := 0; i < count; i++ {
        num := rand.Int()
        err = binary.Write(bufw, binary.LittleEndian, int64(num))
        if err != nil {
            fmt.Println(err)
            return
        }
    }
}

//檢查排序結果
func testResult(out chan int) bool {
    var pre int
    var start bool
    var printCOunt int
    for v := range out {
        if printCOunt < 100 {
            fmt.Println(v)
            printCOunt++
        }
        if !start {
            pre = v
            start = true
            continue
        }
        if v < pre {
            for _ = range out {

            }
            return false
        }
    }
    return true
}
main.go

執行效果:

1.準備測試數據文件:main.createTestFile("small.in",10000)

2.開啓三個外部排序節點:

netnode -p 8080

netnode -p 8081

netnode -p 8082

3.運行主節點heapsort

外部排序節點:

問題:

設計堆數據結構時考慮到支持不一樣數據類型,並能夠經過指定比較函數實現最大最小堆的轉換,可是沒有很好的實現方便的數據類型切換,interface{}能夠接受全部類型,但chan interface{}卻不能接受chan int或chan string。

相關文章
相關標籤/搜索