並行處理管道,感覺Go語言魅力

Go語言三大特點

  • 面向接口編程
  • 函數式編程
  • 併發編程

Go 語言併發編程

  • 採用CSP模型(Communication Sequential Process)
  • 不須要鎖,不須要callback
  • 併發編程 VS 並行計算

外部排序

經常使用的排序算法插入排序、選擇排序和快速排序等都是將數據取到內存中進行排序。當數據量很是大時,大到內存沒法一次將全部數據讀到內存中,這時就須要使用外部排序。html

基本思想:git

  • 將數據分爲左右兩半,分別歸併排序,再把兩個有序數據繼續歸併github

  • 如何歸併golang

    • [1,3,6,7],[1,2,3,5] => 1
    • [3,6,7],[1,2,3,5] => 1
    • [3,6,7],[2,3,5] => 2
    • [3,6,7],[3,5] => 3
    • ...

guibing

Go利用Pipeline外部排序

步驟一:算法

  • 首先切好片的數據經過goroutine讀入到某個節點,經過go chan傳輸。
  • 再啓動一個goroutine,把go chan裏的數據讀入內存中,而後使用快速排序對內存中的數據進行快速排序,最後將排序好的數據再次經過goroutine傳輸給一個go chan。
  • 最後將go chan管道中排好序的數據輸出到文件,或者打印到控制檯。

步驟二:編程

  • 上述步驟一完成了一個節點的外部排序,接着講全部的切片按照上述步驟一均可以完成內部排序並存入文件。
  • 而後經過MergeN將多個節點分別進行兩兩二路歸併,最後將全部節點排好序並經過go chan管道返回出去並存於文件。
// 將文件讀取的數據輸送到一個節點
// 該節點經過goroutine將數據輸送到chan
func ReaderSource(a ...int) <-chan int {
	out := make(chan int)

	go func() {
		for _, v := range a {
			out <- v
		}
		// 必定要close,close後,外面會用if或range取判斷取失敗
		// 數據量大的話,不關閉會很佔內存
		close(out)
	}()
	
	return out
}
複製代碼
// 將上面ReaderSource返回的chan傳進來讀入內存
// 使用內部排序對讀入內存的數據排序
// 而後經過goroutine輸出到chan返回出去
// 參數in 只進不出,返回參數只出不進
func InMemSort(in <-chan int) <-chan int {
	out := make(chan int, 1024)

	go func() {
		// Read into memory
		a := []int{}
		for v := range in {
			a = append(a, v)
		}

		// Sort
		sort.Ints(a)

		// Output
		for _, v := range a {
			out <- v
		}

		// close
		close(out)
	}()

	return out
}
複製代碼
// 將排好序的內存數據打印輸出,或者存文件
func main() {
	p := InMemSort(ArraySource(3, 2, 6, 7, 4))
	for v := range p {
		fmt.Println(v)
	}
}
複製代碼
// 將排好序的多個節點經過2路歸併排序
func MergeN(inputs ... <-chan int) <-chan int {
	if len(inputs) == 1 {
		return inputs[0]
	}

	m := len(inputs) / 2

	// merge inputs[0...m) and inputs [m...end)
	return Merge(MergeN(inputs[:m]...), MergeN(inputs[m:]...))
}
複製代碼
// 將排好序的2個節點歸併歸併
func Merge(in1, in2 <-chan int) <-chan int {
	out := make(chan int, 1024)

	go func() {
		v1, ok1 := <-in1 // 沒有元素ok1返回false
		v2, ok2 := <-in2
		for ok1 || ok2 {
			// v2沒有元素就出v1; v1,v2都有數據,且v1 <= v2也出v1
			if !ok2 || (ok1 && v1 <= v2) {
				out <- v1
				v1, ok1 = <-in1
			} else {
				out <- v2
				v2, ok2 = <-in2
			}
		}

		// 關閉
		close(out)
		fmt.Println("Merge done: ", time.Now().Sub(startTime))
	}()

	return out
}
複製代碼

性能分析

  • ArraySource節點,支持分塊讀取
  • mergeN,搭建歸併節點組,或者使用多路歸併排序
  • pipeline的搭建及運行 - CPU及線程數量的觀測

連接

相關文章
相關標籤/搜索